ofproto: Factor OpenFlow connection management into new "connmgr".
authorBen Pfaff <blp@nicira.com>
Tue, 29 Mar 2011 19:24:28 +0000 (12:24 -0700)
committerBen Pfaff <blp@nicira.com>
Tue, 29 Mar 2011 19:28:11 +0000 (12:28 -0700)
This removes a lot of code from ofproto.c and makes the ofproto code
easier to understand.

ofproto/automake.mk
ofproto/connmgr.c [new file with mode: 0644]
ofproto/connmgr.h [new file with mode: 0644]
ofproto/fail-open.c
ofproto/fail-open.h
ofproto/in-band.c
ofproto/in-band.h
ofproto/ofproto.c

index 6484a26..18aa7e6 100644 (file)
@@ -9,6 +9,8 @@ noinst_LIBRARIES += ofproto/libofproto.a
 ofproto_libofproto_a_SOURCES = \
        ofproto/collectors.c \
        ofproto/collectors.h \
+       ofproto/connmgr.c \
+       ofproto/connmgr.h \
        ofproto/fail-open.c \
        ofproto/fail-open.h \
        ofproto/in-band.c \
diff --git a/ofproto/connmgr.c b/ofproto/connmgr.c
new file mode 100644 (file)
index 0000000..166ef8c
--- /dev/null
@@ -0,0 +1,1318 @@
+/*
+ * Copyright (c) 2009, 2010, 2011 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#include "connmgr.h"
+
+#include <errno.h>
+#include <stdlib.h>
+
+#include "coverage.h"
+#include "dpif.h"
+#include "fail-open.h"
+#include "in-band.h"
+#include "odp-util.h"
+#include "ofp-util.h"
+#include "ofpbuf.h"
+#include "pinsched.h"
+#include "poll-loop.h"
+#include "pktbuf.h"
+#include "rconn.h"
+#include "shash.h"
+#include "timeval.h"
+#include "vconn.h"
+#include "vlog.h"
+
+VLOG_DEFINE_THIS_MODULE(connmgr);
+static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+
+COVERAGE_DEFINE(ofconn_stuck);
+
+/* An OpenFlow connection. */
+struct ofconn {
+    struct connmgr *connmgr;    /* Connection's manager. */
+    struct list node;           /* In struct connmgr's "all_conns" list. */
+    struct rconn *rconn;        /* OpenFlow connection. */
+    enum ofconn_type type;      /* Type. */
+    enum nx_flow_format flow_format; /* Currently selected flow format. */
+
+    /* OFPT_PACKET_IN related data. */
+    struct rconn_packet_counter *packet_in_counter; /* # queued on 'rconn'. */
+#define N_SCHEDULERS 2
+    struct pinsched *schedulers[N_SCHEDULERS];
+    struct pktbuf *pktbuf;         /* OpenFlow packet buffers. */
+    int miss_send_len;             /* Bytes to send of buffered packets. */
+
+    /* Number of OpenFlow messages queued on 'rconn' as replies to OpenFlow
+     * requests, and the maximum number before we stop reading OpenFlow
+     * requests.  */
+#define OFCONN_REPLY_MAX 100
+    struct rconn_packet_counter *reply_counter;
+
+    /* type == OFCONN_PRIMARY only. */
+    enum nx_role role;           /* Role. */
+    struct hmap_node hmap_node;  /* In struct connmgr's "controllers" map. */
+    enum ofproto_band band;      /* In-band or out-of-band? */
+};
+
+static struct ofconn *ofconn_create(struct connmgr *, struct rconn *,
+                                    enum ofconn_type);
+static void ofconn_destroy(struct ofconn *);
+
+static void ofconn_reconfigure(struct ofconn *,
+                               const struct ofproto_controller *);
+
+static void ofconn_run(struct ofconn *,
+                       void (*handle_openflow)(struct ofconn *,
+                                               struct ofpbuf *ofp_msg));
+static void ofconn_wait(struct ofconn *);
+
+static const char *ofconn_get_target(const struct ofconn *);
+static char *ofconn_make_name(const struct connmgr *, const char *target);
+
+static void ofconn_set_rate_limit(struct ofconn *, int rate, int burst);
+
+static bool ofconn_receives_async_msgs(const struct ofconn *);
+
+static void ofconn_send(const struct ofconn *, struct ofpbuf *,
+                        struct rconn_packet_counter *);
+
+static void do_send_packet_in(struct ofpbuf *, void *ofconn_);
+
+/* A listener for incoming OpenFlow "service" connections. */
+struct ofservice {
+    struct hmap_node node;      /* In struct connmgr's "services" hmap. */
+    struct pvconn *pvconn;      /* OpenFlow connection listener. */
+
+    /* These are not used by ofservice directly.  They are settings for
+     * accepted "struct ofconn"s from the pvconn. */
+    int probe_interval;         /* Max idle time before probing, in seconds. */
+    int rate_limit;             /* Max packet-in rate in packets per second. */
+    int burst_limit;            /* Limit on accumulating packet credits. */
+};
+
+static void ofservice_reconfigure(struct ofservice *,
+                                  const struct ofproto_controller *);
+static int ofservice_create(struct connmgr *, const char *target);
+static void ofservice_destroy(struct connmgr *, struct ofservice *);
+static struct ofservice *ofservice_lookup(struct connmgr *,
+                                          const char *target);
+
+/* Connection manager for an OpenFlow switch. */
+struct connmgr {
+    struct ofproto *ofproto;
+    char *name;
+    char *local_port_name;
+
+    /* OpenFlow connections. */
+    struct hmap controllers;   /* Controller "struct ofconn"s. */
+    struct list all_conns;     /* Contains "struct ofconn"s. */
+
+    /* OpenFlow listeners. */
+    struct hmap services;       /* Contains "struct ofservice"s. */
+    struct pvconn **snoops;
+    size_t n_snoops;
+
+    /* Fail open. */
+    struct fail_open *fail_open;
+    enum ofproto_fail_mode fail_mode;
+
+    /* In-band control. */
+    struct in_band *in_band;
+    long long int next_in_band_update;
+    struct sockaddr_in *extra_in_band_remotes;
+    size_t n_extra_remotes;
+    int in_band_queue;
+};
+
+static void update_in_band_remotes(struct connmgr *);
+static void add_snooper(struct connmgr *, struct vconn *);
+
+/* Creates and returns a new connection manager owned by 'ofproto'.  'name' is
+ * a name for the ofproto suitable for using in log messages.
+ * 'local_port_name' is the name of the local port (OFPP_LOCAL) within
+ * 'ofproto'. */
+struct connmgr *
+connmgr_create(struct ofproto *ofproto,
+               const char *name, const char *local_port_name)
+{
+    struct connmgr *mgr;
+
+    mgr = xmalloc(sizeof *mgr);
+    mgr->ofproto = ofproto;
+    mgr->name = xstrdup(name);
+    mgr->local_port_name = xstrdup(local_port_name);
+
+    hmap_init(&mgr->controllers);
+    list_init(&mgr->all_conns);
+
+    hmap_init(&mgr->services);
+    mgr->snoops = NULL;
+    mgr->n_snoops = 0;
+
+    mgr->fail_open = NULL;
+    mgr->fail_mode = OFPROTO_FAIL_SECURE;
+
+    mgr->in_band = NULL;
+    mgr->next_in_band_update = LLONG_MAX;
+    mgr->extra_in_band_remotes = NULL;
+    mgr->n_extra_remotes = 0;
+    mgr->in_band_queue = -1;
+
+    return mgr;
+}
+
+/* Frees 'mgr' and all of its resources. */
+void
+connmgr_destroy(struct connmgr *mgr)
+{
+    struct ofservice *ofservice, *next_ofservice;
+    struct ofconn *ofconn, *next_ofconn;
+    size_t i;
+
+    if (!mgr) {
+        return;
+    }
+
+    LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &mgr->all_conns) {
+        ofconn_destroy(ofconn);
+    }
+    hmap_destroy(&mgr->controllers);
+
+    HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &mgr->services) {
+        ofservice_destroy(mgr, ofservice);
+    }
+    hmap_destroy(&mgr->services);
+
+    for (i = 0; i < mgr->n_snoops; i++) {
+        pvconn_close(mgr->snoops[i]);
+    }
+    free(mgr->snoops);
+
+    fail_open_destroy(mgr->fail_open);
+    mgr->fail_open = NULL;
+
+    in_band_destroy(mgr->in_band);
+    mgr->in_band = NULL;
+    free(mgr->extra_in_band_remotes);
+    free(mgr->name);
+    free(mgr->local_port_name);
+
+    free(mgr);
+}
+
+/* Does all of the periodic maintenance required by 'mgr'.  Calls
+ * 'handle_openflow' for each message received on an OpenFlow connection,
+ * passing along the OpenFlow connection itself and the message that was sent.
+ * The 'handle_openflow' callback must not free the message. */
+void
+connmgr_run(struct connmgr *mgr,
+            void (*handle_openflow)(struct ofconn *, struct ofpbuf *ofp_msg))
+{
+    struct ofconn *ofconn, *next_ofconn;
+    struct ofservice *ofservice;
+    size_t i;
+
+    if (mgr->in_band) {
+        if (time_msec() >= mgr->next_in_band_update) {
+            update_in_band_remotes(mgr);
+        }
+        in_band_run(mgr->in_band);
+    }
+
+    LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &mgr->all_conns) {
+        ofconn_run(ofconn, handle_openflow);
+    }
+
+    /* Fail-open maintenance.  Do this after processing the ofconns since
+     * fail-open checks the status of the controller rconn. */
+    if (mgr->fail_open) {
+        fail_open_run(mgr->fail_open);
+    }
+
+    HMAP_FOR_EACH (ofservice, node, &mgr->services) {
+        struct vconn *vconn;
+        int retval;
+
+        retval = pvconn_accept(ofservice->pvconn, OFP_VERSION, &vconn);
+        if (!retval) {
+            struct rconn *rconn;
+            char *name;
+
+            rconn = rconn_create(ofservice->probe_interval, 0);
+            name = ofconn_make_name(mgr, vconn_get_name(vconn));
+            rconn_connect_unreliably(rconn, vconn, name);
+            free(name);
+
+            ofconn = ofconn_create(mgr, rconn, OFCONN_SERVICE);
+            ofconn_set_rate_limit(ofconn, ofservice->rate_limit,
+                                  ofservice->burst_limit);
+        } else if (retval != EAGAIN) {
+            VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
+        }
+    }
+
+    for (i = 0; i < mgr->n_snoops; i++) {
+        struct vconn *vconn;
+        int retval;
+
+        retval = pvconn_accept(mgr->snoops[i], OFP_VERSION, &vconn);
+        if (!retval) {
+            add_snooper(mgr, vconn);
+        } else if (retval != EAGAIN) {
+            VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
+        }
+    }
+}
+
+/* Causes the poll loop to wake up when connmgr_run() needs to run. */
+void
+connmgr_wait(struct connmgr *mgr)
+{
+    struct ofservice *ofservice;
+    struct ofconn *ofconn;
+    size_t i;
+
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        ofconn_wait(ofconn);
+    }
+    if (mgr->in_band) {
+        poll_timer_wait_until(mgr->next_in_band_update);
+        in_band_wait(mgr->in_band);
+    }
+    if (mgr->fail_open) {
+        fail_open_wait(mgr->fail_open);
+    }
+    HMAP_FOR_EACH (ofservice, node, &mgr->services) {
+        pvconn_wait(ofservice->pvconn);
+    }
+    for (i = 0; i < mgr->n_snoops; i++) {
+        pvconn_wait(mgr->snoops[i]);
+    }
+}
+
+/* Returns the ofproto that owns 'ofconn''s connmgr. */
+struct ofproto *
+ofconn_get_ofproto(const struct ofconn *ofconn)
+{
+    return ofconn->connmgr->ofproto;
+}
+\f
+/* OpenFlow configuration. */
+
+static void add_controller(struct connmgr *, const char *target);
+static struct ofconn *find_controller_by_target(struct connmgr *,
+                                                const char *target);
+static void update_fail_open(struct connmgr *);
+static int set_pvconns(struct pvconn ***pvconnsp, size_t *n_pvconnsp,
+                       const struct svec *);
+
+/* Returns true if 'mgr' has any configured primary controllers.
+ *
+ * Service controllers do not count, but configured primary controllers do
+ * count whether or not they are currently connected. */
+bool
+connmgr_has_controllers(const struct connmgr *mgr)
+{
+    return !hmap_is_empty(&mgr->controllers);
+}
+
+/* Initializes 'info' and populates it with information about each configured
+ * primary controller.  The keys in 'info' are the controllers' targets; the
+ * data values are corresponding "struct ofproto_controller_info".
+ *
+ * The caller owns 'info' and everything in it and should free it when it is no
+ * longer needed. */
+void
+connmgr_get_controller_info(struct connmgr *mgr, struct shash *info)
+{
+    const struct ofconn *ofconn;
+
+    shash_init(info);
+
+    HMAP_FOR_EACH (ofconn, hmap_node, &mgr->controllers) {
+        const struct rconn *rconn = ofconn->rconn;
+        time_t now = time_now();
+        time_t last_connection = rconn_get_last_connection(rconn);
+        time_t last_disconnect = rconn_get_last_disconnect(rconn);
+        int last_error = rconn_get_last_error(rconn);
+        struct ofproto_controller_info *cinfo = xmalloc(sizeof *cinfo);
+
+        shash_add(info, rconn_get_target(rconn), cinfo);
+
+        cinfo->is_connected = rconn_is_connected(rconn);
+        cinfo->role = ofconn->role;
+
+        cinfo->pairs.n = 0;
+
+        if (last_error) {
+            cinfo->pairs.keys[cinfo->pairs.n] = "last_error";
+            cinfo->pairs.values[cinfo->pairs.n++] =
+                xstrdup(ovs_retval_to_string(last_error));
+        }
+
+        cinfo->pairs.keys[cinfo->pairs.n] = "state";
+        cinfo->pairs.values[cinfo->pairs.n++] =
+            xstrdup(rconn_get_state(rconn));
+
+        if (last_connection != TIME_MIN) {
+            cinfo->pairs.keys[cinfo->pairs.n] = "sec_since_connect";
+            cinfo->pairs.values[cinfo->pairs.n++]
+                = xasprintf("%ld", (long int) (now - last_connection));
+        }
+
+        if (last_disconnect != TIME_MIN) {
+            cinfo->pairs.keys[cinfo->pairs.n] = "sec_since_disconnect";
+            cinfo->pairs.values[cinfo->pairs.n++]
+                = xasprintf("%ld", (long int) (now - last_disconnect));
+        }
+    }
+}
+
+/* Changes 'mgr''s set of controllers to the 'n_controllers' controllers in
+ * 'controllers'. */
+void
+connmgr_set_controllers(struct connmgr *mgr,
+                        const struct ofproto_controller *controllers,
+                        size_t n_controllers)
+{
+    struct shash new_controllers;
+    struct ofconn *ofconn, *next_ofconn;
+    struct ofservice *ofservice, *next_ofservice;
+    bool ss_exists;
+    size_t i;
+
+    /* Create newly configured controllers and services.
+     * Create a name to ofproto_controller mapping in 'new_controllers'. */
+    shash_init(&new_controllers);
+    for (i = 0; i < n_controllers; i++) {
+        const struct ofproto_controller *c = &controllers[i];
+
+        if (!vconn_verify_name(c->target)) {
+            if (!find_controller_by_target(mgr, c->target)) {
+                add_controller(mgr, c->target);
+            }
+        } else if (!pvconn_verify_name(c->target)) {
+            if (!ofservice_lookup(mgr, c->target)) {
+                ofservice_create(mgr, c->target);
+            }
+        } else {
+            VLOG_WARN_RL(&rl, "%s: unsupported controller \"%s\"",
+                         mgr->name, c->target);
+            continue;
+        }
+
+        shash_add_once(&new_controllers, c->target, &controllers[i]);
+    }
+
+    /* Delete controllers that are no longer configured.
+     * Update configuration of all now-existing controllers. */
+    ss_exists = false;
+    HMAP_FOR_EACH_SAFE (ofconn, next_ofconn, hmap_node, &mgr->controllers) {
+        struct ofproto_controller *c;
+
+        c = shash_find_data(&new_controllers, ofconn_get_target(ofconn));
+        if (!c) {
+            ofconn_destroy(ofconn);
+        } else {
+            ofconn_reconfigure(ofconn, c);
+        }
+    }
+
+    /* Delete services that are no longer configured.
+     * Update configuration of all now-existing services. */
+    HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &mgr->services) {
+        struct ofproto_controller *c;
+
+        c = shash_find_data(&new_controllers,
+                            pvconn_get_name(ofservice->pvconn));
+        if (!c) {
+            ofservice_destroy(mgr, ofservice);
+        } else {
+            ofservice_reconfigure(ofservice, c);
+        }
+    }
+
+    shash_destroy(&new_controllers);
+
+    update_in_band_remotes(mgr);
+    update_fail_open(mgr);
+}
+
+/* Drops the connections between 'mgr' and all of its primary and secondary
+ * controllers, forcing them to reconnect. */
+void
+connmgr_reconnect(const struct connmgr *mgr)
+{
+    struct ofconn *ofconn;
+
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        rconn_reconnect(ofconn->rconn);
+    }
+}
+
+/* Sets the "snoops" for 'mgr' to the pvconn targets listed in 'snoops'.
+ *
+ * A "snoop" is a pvconn to which every OpenFlow message to or from the most
+ * important controller on 'mgr' is mirrored. */
+int
+connmgr_set_snoops(struct connmgr *mgr, const struct svec *snoops)
+{
+    return set_pvconns(&mgr->snoops, &mgr->n_snoops, snoops);
+}
+
+/* Adds each of the snoops currently configured on 'mgr' to 'snoops'. */
+void
+connmgr_get_snoops(const struct connmgr *mgr, struct svec *snoops)
+{
+    size_t i;
+
+    for (i = 0; i < mgr->n_snoops; i++) {
+        svec_add(snoops, pvconn_get_name(mgr->snoops[i]));
+    }
+}
+
+/* Creates a new controller for 'target' in 'mgr'.  update_controller() needs
+ * to be called later to finish the new ofconn's configuration. */
+static void
+add_controller(struct connmgr *mgr, const char *target)
+{
+    char *name = ofconn_make_name(mgr, target);
+    struct ofconn *ofconn;
+
+    ofconn = ofconn_create(mgr, rconn_create(5, 8), OFCONN_PRIMARY);
+    ofconn->pktbuf = pktbuf_create();
+    ofconn->miss_send_len = OFP_DEFAULT_MISS_SEND_LEN;
+    rconn_connect(ofconn->rconn, target, name);
+    hmap_insert(&mgr->controllers, &ofconn->hmap_node, hash_string(target, 0));
+
+    free(name);
+}
+
+static struct ofconn *
+find_controller_by_target(struct connmgr *mgr, const char *target)
+{
+    struct ofconn *ofconn;
+
+    HMAP_FOR_EACH_WITH_HASH (ofconn, hmap_node,
+                             hash_string(target, 0), &mgr->controllers) {
+        if (!strcmp(ofconn_get_target(ofconn), target)) {
+            return ofconn;
+        }
+    }
+    return NULL;
+}
+
+static void
+update_in_band_remotes(struct connmgr *mgr)
+{
+    struct sockaddr_in *addrs;
+    size_t max_addrs, n_addrs;
+    struct ofconn *ofconn;
+    size_t i;
+
+    /* Allocate enough memory for as many remotes as we could possibly have. */
+    max_addrs = mgr->n_extra_remotes + hmap_count(&mgr->controllers);
+    addrs = xmalloc(max_addrs * sizeof *addrs);
+    n_addrs = 0;
+
+    /* Add all the remotes. */
+    HMAP_FOR_EACH (ofconn, hmap_node, &mgr->controllers) {
+        struct sockaddr_in *sin = &addrs[n_addrs];
+
+        if (ofconn->band == OFPROTO_OUT_OF_BAND) {
+            continue;
+        }
+
+        sin->sin_addr.s_addr = rconn_get_remote_ip(ofconn->rconn);
+        if (sin->sin_addr.s_addr) {
+            sin->sin_port = rconn_get_remote_port(ofconn->rconn);
+            n_addrs++;
+        }
+    }
+    for (i = 0; i < mgr->n_extra_remotes; i++) {
+        addrs[n_addrs++] = mgr->extra_in_band_remotes[i];
+    }
+
+    /* Create or update or destroy in-band. */
+    if (n_addrs) {
+        if (!mgr->in_band) {
+            in_band_create(mgr->ofproto, mgr->local_port_name, &mgr->in_band);
+        }
+        if (mgr->in_band) {
+            in_band_set_remotes(mgr->in_band, addrs, n_addrs);
+        }
+        in_band_set_queue(mgr->in_band, mgr->in_band_queue);
+        mgr->next_in_band_update = time_msec() + 1000;
+    } else {
+        in_band_destroy(mgr->in_band);
+        mgr->in_band = NULL;
+    }
+
+    /* Clean up. */
+    free(addrs);
+}
+
+static void
+update_fail_open(struct connmgr *mgr)
+{
+    if (connmgr_has_controllers(mgr)
+        && mgr->fail_mode == OFPROTO_FAIL_STANDALONE) {
+        if (!mgr->fail_open) {
+            mgr->fail_open = fail_open_create(mgr->ofproto, mgr);
+        }
+    } else {
+        fail_open_destroy(mgr->fail_open);
+        mgr->fail_open = NULL;
+    }
+}
+
+static int
+set_pvconns(struct pvconn ***pvconnsp, size_t *n_pvconnsp,
+            const struct svec *svec)
+{
+    struct pvconn **pvconns = *pvconnsp;
+    size_t n_pvconns = *n_pvconnsp;
+    int retval = 0;
+    size_t i;
+
+    for (i = 0; i < n_pvconns; i++) {
+        pvconn_close(pvconns[i]);
+    }
+    free(pvconns);
+
+    pvconns = xmalloc(svec->n * sizeof *pvconns);
+    n_pvconns = 0;
+    for (i = 0; i < svec->n; i++) {
+        const char *name = svec->names[i];
+        struct pvconn *pvconn;
+        int error;
+
+        error = pvconn_open(name, &pvconn);
+        if (!error) {
+            pvconns[n_pvconns++] = pvconn;
+        } else {
+            VLOG_ERR("failed to listen on %s: %s", name, strerror(error));
+            if (!retval) {
+                retval = error;
+            }
+        }
+    }
+
+    *pvconnsp = pvconns;
+    *n_pvconnsp = n_pvconns;
+
+    return retval;
+}
+
+/* Returns a "preference level" for snooping 'ofconn'.  A higher return value
+ * means that 'ofconn' is more interesting for monitoring than a lower return
+ * value. */
+static int
+snoop_preference(const struct ofconn *ofconn)
+{
+    switch (ofconn->role) {
+    case NX_ROLE_MASTER:
+        return 3;
+    case NX_ROLE_OTHER:
+        return 2;
+    case NX_ROLE_SLAVE:
+        return 1;
+    default:
+        /* Shouldn't happen. */
+        return 0;
+    }
+}
+
+/* One of 'mgr''s "snoop" pvconns has accepted a new connection on 'vconn'.
+ * Connects this vconn to a controller. */
+static void
+add_snooper(struct connmgr *mgr, struct vconn *vconn)
+{
+    struct ofconn *ofconn, *best;
+
+    /* Pick a controller for monitoring. */
+    best = NULL;
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        if (ofconn->type == OFCONN_PRIMARY
+            && (!best || snoop_preference(ofconn) > snoop_preference(best))) {
+            best = ofconn;
+        }
+    }
+
+    if (best) {
+        rconn_add_monitor(best->rconn, vconn);
+    } else {
+        VLOG_INFO_RL(&rl, "no controller connection to snoop");
+        vconn_close(vconn);
+    }
+}
+\f
+/* Public ofconn functions. */
+
+/* Returns the connection type, either OFCONN_PRIMARY or OFCONN_SERVICE. */
+enum ofconn_type
+ofconn_get_type(const struct ofconn *ofconn)
+{
+    return ofconn->type;
+}
+
+/* Returns the role configured for 'ofconn'.
+ *
+ * The default role, if no other role has been set, is NX_ROLE_OTHER. */
+enum nx_role
+ofconn_get_role(const struct ofconn *ofconn)
+{
+    return ofconn->role;
+}
+
+/* Changes 'ofconn''s role to 'role'.  If 'role' is NX_ROLE_MASTER then any
+ * existing master is demoted to a slave. */
+void
+ofconn_set_role(struct ofconn *ofconn, enum nx_role role)
+{
+    if (role == NX_ROLE_MASTER) {
+        struct ofconn *other;
+
+        HMAP_FOR_EACH (other, hmap_node, &ofconn->connmgr->controllers) {
+            if (other->role == NX_ROLE_MASTER) {
+                other->role = NX_ROLE_SLAVE;
+            }
+        }
+    }
+    ofconn->role = role;
+}
+
+/* Returns the currently configured flow format for 'ofconn', one of NXFF_*.
+ *
+ * The default, if no other format has been set, is NXFF_OPENFLOW10. */
+enum nx_flow_format
+ofconn_get_flow_format(struct ofconn *ofconn)
+{
+    return ofconn->flow_format;
+}
+
+/* Sets the flow format for 'ofconn' to 'flow_format' (one of NXFF_*). */
+void
+ofconn_set_flow_format(struct ofconn *ofconn, enum nx_flow_format flow_format)
+{
+    ofconn->flow_format = flow_format;
+}
+
+/* Returns the default miss send length for 'ofconn'. */
+int
+ofconn_get_miss_send_len(const struct ofconn *ofconn)
+{
+    return ofconn->miss_send_len;
+}
+
+/* Sets the default miss send length for 'ofconn' to 'miss_send_len'. */
+void
+ofconn_set_miss_send_len(struct ofconn *ofconn, int miss_send_len)
+{
+    ofconn->miss_send_len = miss_send_len;
+}
+
+/* Sends 'msg' on 'ofconn', accounting it as a reply.  (If there is a
+ * sufficient number of OpenFlow replies in-flight on a single ofconn, then the
+ * connmgr will stop accepting new OpenFlow requests on that ofconn until the
+ * controller has accepted some of the replies.) */
+void
+ofconn_send_reply(const struct ofconn *ofconn, struct ofpbuf *msg)
+{
+    ofconn_send(ofconn, msg, ofconn->reply_counter);
+}
+
+/* Same as pktbuf_retrieve(), using the pktbuf owned by 'ofconn'. */
+int
+ofconn_pktbuf_retrieve(struct ofconn *ofconn, uint32_t id,
+                       struct ofpbuf **bufferp, uint16_t *in_port)
+{
+    return pktbuf_retrieve(ofconn->pktbuf, id, bufferp, in_port);
+}
+\f
+/* Private ofconn functions. */
+
+static const char *
+ofconn_get_target(const struct ofconn *ofconn)
+{
+    return rconn_get_target(ofconn->rconn);
+}
+
+static struct ofconn *
+ofconn_create(struct connmgr *mgr, struct rconn *rconn, enum ofconn_type type)
+{
+    struct ofconn *ofconn = xzalloc(sizeof *ofconn);
+    ofconn->connmgr = mgr;
+    list_push_back(&mgr->all_conns, &ofconn->node);
+    ofconn->rconn = rconn;
+    ofconn->type = type;
+    ofconn->flow_format = NXFF_OPENFLOW10;
+    ofconn->role = NX_ROLE_OTHER;
+    ofconn->packet_in_counter = rconn_packet_counter_create ();
+    ofconn->pktbuf = NULL;
+    ofconn->miss_send_len = 0;
+    ofconn->reply_counter = rconn_packet_counter_create ();
+    return ofconn;
+}
+
+static void
+ofconn_destroy(struct ofconn *ofconn)
+{
+    if (ofconn->type == OFCONN_PRIMARY) {
+        hmap_remove(&ofconn->connmgr->controllers, &ofconn->hmap_node);
+    }
+
+    list_remove(&ofconn->node);
+    rconn_destroy(ofconn->rconn);
+    rconn_packet_counter_destroy(ofconn->packet_in_counter);
+    rconn_packet_counter_destroy(ofconn->reply_counter);
+    pktbuf_destroy(ofconn->pktbuf);
+    free(ofconn);
+}
+
+/* Reconfigures 'ofconn' to match 'c'.  'ofconn' and 'c' must have the same
+ * target. */
+static void
+ofconn_reconfigure(struct ofconn *ofconn, const struct ofproto_controller *c)
+{
+    int probe_interval;
+
+    ofconn->band = c->band;
+
+    rconn_set_max_backoff(ofconn->rconn, c->max_backoff);
+
+    probe_interval = c->probe_interval ? MAX(c->probe_interval, 5) : 0;
+    rconn_set_probe_interval(ofconn->rconn, probe_interval);
+
+    ofconn_set_rate_limit(ofconn, c->rate_limit, c->burst_limit);
+}
+
+static void
+ofconn_run(struct ofconn *ofconn,
+           void (*handle_openflow)(struct ofconn *, struct ofpbuf *ofp_msg))
+{
+    struct connmgr *mgr = ofconn->connmgr;
+    int iteration;
+    size_t i;
+
+    for (i = 0; i < N_SCHEDULERS; i++) {
+        pinsched_run(ofconn->schedulers[i], do_send_packet_in, ofconn);
+    }
+
+    rconn_run(ofconn->rconn);
+
+    if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
+        /* Limit the number of iterations to prevent other tasks from
+         * starving. */
+        for (iteration = 0; iteration < 50; iteration++) {
+            struct ofpbuf *of_msg = rconn_recv(ofconn->rconn);
+            if (!of_msg) {
+                break;
+            }
+            if (mgr->fail_open) {
+                fail_open_maybe_recover(mgr->fail_open);
+            }
+            handle_openflow(ofconn, of_msg);
+            ofpbuf_delete(of_msg);
+        }
+    }
+
+    if (!rconn_is_alive(ofconn->rconn)) {
+        ofconn_destroy(ofconn);
+    }
+}
+
+static void
+ofconn_wait(struct ofconn *ofconn)
+{
+    int i;
+
+    for (i = 0; i < N_SCHEDULERS; i++) {
+        pinsched_wait(ofconn->schedulers[i]);
+    }
+    rconn_run_wait(ofconn->rconn);
+    if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
+        rconn_recv_wait(ofconn->rconn);
+    } else {
+        COVERAGE_INC(ofconn_stuck);
+    }
+}
+
+/* Returns true if 'ofconn' should receive asynchronous messages. */
+static bool
+ofconn_receives_async_msgs(const struct ofconn *ofconn)
+{
+    if (!rconn_is_connected(ofconn->rconn)) {
+        return false;
+    } else if (ofconn->type == OFCONN_PRIMARY) {
+        /* Primary controllers always get asynchronous messages unless they
+         * have configured themselves as "slaves".  */
+        return ofconn->role != NX_ROLE_SLAVE;
+    } else {
+        /* Service connections don't get asynchronous messages unless they have
+         * explicitly asked for them by setting a nonzero miss send length. */
+        return ofconn->miss_send_len > 0;
+    }
+}
+
+/* Returns a human-readable name for an OpenFlow connection between 'mgr' and
+ * 'target', suitable for use in log messages for identifying the connection.
+ *
+ * The name is dynamically allocated.  The caller should free it (with free())
+ * when it is no longer needed. */
+static char *
+ofconn_make_name(const struct connmgr *mgr, const char *target)
+{
+    return xasprintf("%s<->%s", mgr->name, target);
+}
+
+static void
+ofconn_set_rate_limit(struct ofconn *ofconn, int rate, int burst)
+{
+    int i;
+
+    for (i = 0; i < N_SCHEDULERS; i++) {
+        struct pinsched **s = &ofconn->schedulers[i];
+
+        if (rate > 0) {
+            if (!*s) {
+                *s = pinsched_create(rate, burst);
+            } else {
+                pinsched_set_limits(*s, rate, burst);
+            }
+        } else {
+            pinsched_destroy(*s);
+            *s = NULL;
+        }
+    }
+}
+
+static void
+ofconn_send(const struct ofconn *ofconn, struct ofpbuf *msg,
+            struct rconn_packet_counter *counter)
+{
+    update_openflow_length(msg);
+    if (rconn_send(ofconn->rconn, msg, counter)) {
+        ofpbuf_delete(msg);
+    }
+}
+\f
+/* Sending asynchronous messages. */
+
+static void schedule_packet_in(struct ofconn *, const struct dpif_upcall *,
+                               const struct flow *, struct ofpbuf *rw_packet);
+
+/* Sends an OFPT_PORT_STATUS message with 'opp' and 'reason' to appropriate
+ * controllers managed by 'mgr'.
+ *
+ * 'opp' is in *HOST* byte order. */
+void
+connmgr_send_port_status(struct connmgr *mgr, const struct ofp_phy_port *opp,
+                         uint8_t reason)
+{
+    /* XXX Should limit the number of queued port status change messages. */
+    struct ofconn *ofconn;
+
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        struct ofp_port_status *ops;
+        struct ofpbuf *b;
+
+        /* Primary controllers, even slaves, should always get port status
+           updates.  Otherwise obey ofconn_receives_async_msgs(). */
+        if (ofconn->type != OFCONN_PRIMARY
+            && !ofconn_receives_async_msgs(ofconn)) {
+            continue;
+        }
+
+        ops = make_openflow_xid(sizeof *ops, OFPT_PORT_STATUS, 0, &b);
+        ops->reason = reason;
+        ops->desc = *opp;
+        hton_ofp_phy_port(&ops->desc);
+        ofconn_send(ofconn, b, NULL);
+    }
+}
+
+/* Sends an OFPT_FLOW_REMOVED or NXT_FLOW_REMOVED message based on 'fr' to
+ * appropriate controllers managed by 'mgr'. */
+void
+connmgr_send_flow_removed(struct connmgr *mgr,
+                          const struct ofputil_flow_removed *fr)
+{
+    struct ofconn *ofconn;
+
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        struct ofpbuf *msg;
+
+        if (!ofconn_receives_async_msgs(ofconn)) {
+            continue;
+        }
+
+        /* Account flow expirations as replies to OpenFlow requests.  That
+         * works because preventing OpenFlow requests from being processed also
+         * prevents new flows from being added (and expiring).  (It also
+         * prevents processing OpenFlow requests that would not add new flows,
+         * so it is imperfect.) */
+        msg = ofputil_encode_flow_removed(fr, ofconn->flow_format);
+        ofconn_send_reply(ofconn, msg);
+    }
+}
+
+/* Given 'upcall', of type DPIF_UC_ACTION or DPIF_UC_MISS, sends an
+ * OFPT_PACKET_IN message to each OpenFlow controller as necessary according to
+ * their individual configurations.
+ *
+ * 'rw_packet' may be NULL.  Otherwise, 'rw_packet' must contain the same data
+ * as upcall->packet.  (rw_packet == upcall->packet is also valid.)  Ownership
+ * of 'rw_packet' is transferred to this function. */
+void
+connmgr_send_packet_in(struct connmgr *mgr, const struct dpif_upcall *upcall,
+                       const struct flow *flow, struct ofpbuf *rw_packet)
+{
+    struct ofconn *ofconn, *prev;
+
+    prev = NULL;
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        if (ofconn_receives_async_msgs(ofconn)) {
+            if (prev) {
+                schedule_packet_in(prev, upcall, flow, NULL);
+            }
+            prev = ofconn;
+        }
+    }
+    if (prev) {
+        schedule_packet_in(prev, upcall, flow, rw_packet);
+    } else {
+        ofpbuf_delete(rw_packet);
+    }
+}
+
+/* pinsched callback for sending 'ofp_packet_in' on 'ofconn'. */
+static void
+do_send_packet_in(struct ofpbuf *ofp_packet_in, void *ofconn_)
+{
+    struct ofconn *ofconn = ofconn_;
+
+    rconn_send_with_limit(ofconn->rconn, ofp_packet_in,
+                          ofconn->packet_in_counter, 100);
+}
+
+/* Takes 'upcall', whose packet has the flow specified by 'flow', composes an
+ * OpenFlow packet-in message from it, and passes it to 'ofconn''s packet
+ * scheduler for sending.
+ *
+ * 'rw_packet' may be NULL.  Otherwise, 'rw_packet' must contain the same data
+ * as upcall->packet.  (rw_packet == upcall->packet is also valid.)  Ownership
+ * of 'rw_packet' is transferred to this function. */
+static void
+schedule_packet_in(struct ofconn *ofconn, const struct dpif_upcall *upcall,
+                   const struct flow *flow, struct ofpbuf *rw_packet)
+{
+    struct connmgr *mgr = ofconn->connmgr;
+    struct ofputil_packet_in pin;
+
+    /* Figure out the easy parts. */
+    pin.packet = upcall->packet;
+    pin.in_port = odp_port_to_ofp_port(flow->in_port);
+    pin.reason = upcall->type == DPIF_UC_MISS ? OFPR_NO_MATCH : OFPR_ACTION;
+
+    /* Get OpenFlow buffer_id. */
+    if (upcall->type == DPIF_UC_ACTION) {
+        pin.buffer_id = UINT32_MAX;
+    } else if (mgr->fail_open && fail_open_is_active(mgr->fail_open)) {
+        pin.buffer_id = pktbuf_get_null();
+    } else if (!ofconn->pktbuf) {
+        pin.buffer_id = UINT32_MAX;
+    } else {
+        pin.buffer_id = pktbuf_save(ofconn->pktbuf, upcall->packet,
+                                    flow->in_port);
+    }
+
+    /* Figure out how much of the packet to send. */
+    pin.send_len = upcall->packet->size;
+    if (pin.buffer_id != UINT32_MAX) {
+        pin.send_len = MIN(pin.send_len, ofconn->miss_send_len);
+    }
+    if (upcall->type == DPIF_UC_ACTION) {
+        pin.send_len = MIN(pin.send_len, upcall->userdata);
+    }
+
+    /* Make OFPT_PACKET_IN and hand over to packet scheduler.  It might
+     * immediately call into do_send_packet_in() or it might buffer it for a
+     * while (until a later call to pinsched_run()). */
+    pinsched_send(ofconn->schedulers[upcall->type == DPIF_UC_MISS ? 0 : 1],
+                  flow->in_port, ofputil_encode_packet_in(&pin, rw_packet),
+                  do_send_packet_in, ofconn);
+}
+\f
+/* Fail-open settings. */
+
+/* Returns the failure handling mode (OFPROTO_FAIL_SECURE or
+ * OFPROTO_FAIL_STANDALONE) for 'mgr'. */
+enum ofproto_fail_mode
+connmgr_get_fail_mode(const struct connmgr *mgr)
+{
+    return mgr->fail_mode;
+}
+
+/* Sets the failure handling mode for 'mgr' to 'fail_mode' (either
+ * OFPROTO_FAIL_SECURE or OFPROTO_FAIL_STANDALONE). */
+void
+connmgr_set_fail_mode(struct connmgr *mgr, enum ofproto_fail_mode fail_mode)
+{
+    mgr->fail_mode = fail_mode;
+    update_fail_open(mgr);
+}
+\f
+/* Fail-open implementation. */
+
+/* Returns the longest probe interval among the primary controllers configured
+ * on 'mgr'.  Returns 0 if there are no primary controllers. */
+int
+connmgr_get_max_probe_interval(const struct connmgr *mgr)
+{
+    const struct ofconn *ofconn;
+    int max_probe_interval;
+
+    max_probe_interval = 0;
+    HMAP_FOR_EACH (ofconn, hmap_node, &mgr->controllers) {
+        int probe_interval = rconn_get_probe_interval(ofconn->rconn);
+        max_probe_interval = MAX(max_probe_interval, probe_interval);
+    }
+    return max_probe_interval;
+}
+
+/* Returns the number of seconds for which all of 'mgr's primary controllers
+ * have been disconnected.  Returns 0 if 'mgr' has no primary controllers. */
+int
+connmgr_failure_duration(const struct connmgr *mgr)
+{
+    const struct ofconn *ofconn;
+    int min_failure_duration;
+
+    if (!connmgr_has_controllers(mgr)) {
+        return 0;
+    }
+
+    min_failure_duration = INT_MAX;
+    HMAP_FOR_EACH (ofconn, hmap_node, &mgr->controllers) {
+        int failure_duration = rconn_failure_duration(ofconn->rconn);
+        min_failure_duration = MIN(min_failure_duration, failure_duration);
+    }
+    return min_failure_duration;
+}
+
+/* Returns true if at least one primary controller is connected (regardless of
+ * whether those controllers are believed to have authenticated and accepted
+ * this switch), false if none of them are connected. */
+bool
+connmgr_is_any_controller_connected(const struct connmgr *mgr)
+{
+    const struct ofconn *ofconn;
+
+    HMAP_FOR_EACH (ofconn, hmap_node, &mgr->controllers) {
+        if (rconn_is_connected(ofconn->rconn)) {
+            return true;
+        }
+    }
+    return false;
+}
+
+/* Returns true if at least one primary controller is believed to have
+ * authenticated and accepted this switch, false otherwise. */
+bool
+connmgr_is_any_controller_admitted(const struct connmgr *mgr)
+{
+    const struct ofconn *ofconn;
+
+    HMAP_FOR_EACH (ofconn, hmap_node, &mgr->controllers) {
+        if (rconn_is_admitted(ofconn->rconn)) {
+            return true;
+        }
+    }
+    return false;
+}
+
+/* Sends 'packet' to each controller connected to 'mgr'.  Takes ownership of
+ * 'packet'. */
+void
+connmgr_broadcast(struct connmgr *mgr, struct ofpbuf *packet)
+{
+    struct ofconn *ofconn, *prev;
+
+    prev = NULL;
+    LIST_FOR_EACH (ofconn, node, &mgr->all_conns) {
+        if (prev) {
+            ofconn_send_reply(ofconn, ofpbuf_clone(packet));
+        }
+        if (rconn_is_connected(ofconn->rconn)) {
+            prev = ofconn;
+        }
+    }
+    if (prev) {
+        ofconn_send_reply(ofconn, packet);
+    } else {
+        ofpbuf_delete(packet);
+    }
+}
+\f
+/* In-band configuration. */
+
+static bool any_extras_changed(const struct connmgr *,
+                               const struct sockaddr_in *extras, size_t n);
+
+/* Sets the 'n' TCP port addresses in 'extras' as ones to which 'mgr''s
+ * in-band control should guarantee access, in the same way that in-band
+ * control guarantees access to OpenFlow controllers. */
+void
+connmgr_set_extra_in_band_remotes(struct connmgr *mgr,
+                                  const struct sockaddr_in *extras, size_t n)
+{
+    if (!any_extras_changed(mgr, extras, n)) {
+        return;
+    }
+
+    free(mgr->extra_in_band_remotes);
+    mgr->n_extra_remotes = n;
+    mgr->extra_in_band_remotes = xmemdup(extras, n * sizeof *extras);
+
+    update_in_band_remotes(mgr);
+}
+
+/* Sets the OpenFlow queue used by flows set up by in-band control on
+ * 'mgr' to 'queue_id'.  If 'queue_id' is negative, then in-band control
+ * flows will use the default queue. */
+void
+connmgr_set_in_band_queue(struct connmgr *mgr, int queue_id)
+{
+    if (queue_id != mgr->in_band_queue) {
+        mgr->in_band_queue = queue_id;
+        update_in_band_remotes(mgr);
+    }
+}
+
+static bool
+any_extras_changed(const struct connmgr *mgr,
+                   const struct sockaddr_in *extras, size_t n)
+{
+    size_t i;
+
+    if (n != mgr->n_extra_remotes) {
+        return true;
+    }
+
+    for (i = 0; i < n; i++) {
+        const struct sockaddr_in *old = &mgr->extra_in_band_remotes[i];
+        const struct sockaddr_in *new = &extras[i];
+
+        if (old->sin_addr.s_addr != new->sin_addr.s_addr ||
+            old->sin_port != new->sin_port) {
+            return true;
+        }
+    }
+
+    return false;
+}
+\f
+/* In-band implementation. */
+
+bool
+connmgr_msg_in_hook(struct connmgr *mgr, const struct flow *flow,
+                    const struct ofpbuf *packet)
+{
+    return mgr->in_band && in_band_msg_in_hook(mgr->in_band, flow, packet);
+}
+
+bool
+connmgr_may_set_up_flow(struct connmgr *mgr, const struct flow *flow,
+                        const struct nlattr *odp_actions,
+                        size_t actions_len)
+{
+    return !mgr->in_band || in_band_rule_check(flow, odp_actions, actions_len);
+}
+\f
+/* Fail-open and in-band implementation. */
+
+/* Called by 'ofproto' after all flows have been flushed, to allow fail-open
+ * and in-band control to re-create their flows. */
+void
+connmgr_flushed(struct connmgr *mgr)
+{
+    if (mgr->in_band) {
+        in_band_flushed(mgr->in_band);
+    }
+    if (mgr->fail_open) {
+        fail_open_flushed(mgr->fail_open);
+    }
+}
+\f
+/* Creates a new ofservice for 'target' in 'mgr'.  Returns 0 if successful,
+ * otherwise a positive errno value.
+ *
+ * ofservice_reconfigure() must be called to fully configure the new
+ * ofservice. */
+static int
+ofservice_create(struct connmgr *mgr, const char *target)
+{
+    struct ofservice *ofservice;
+    struct pvconn *pvconn;
+    int error;
+
+    error = pvconn_open(target, &pvconn);
+    if (error) {
+        return error;
+    }
+
+    ofservice = xzalloc(sizeof *ofservice);
+    hmap_insert(&mgr->services, &ofservice->node, hash_string(target, 0));
+    ofservice->pvconn = pvconn;
+
+    return 0;
+}
+
+static void
+ofservice_destroy(struct connmgr *mgr, struct ofservice *ofservice)
+{
+    hmap_remove(&mgr->services, &ofservice->node);
+    pvconn_close(ofservice->pvconn);
+    free(ofservice);
+}
+
+static void
+ofservice_reconfigure(struct ofservice *ofservice,
+                      const struct ofproto_controller *c)
+{
+    ofservice->probe_interval = c->probe_interval;
+    ofservice->rate_limit = c->rate_limit;
+    ofservice->burst_limit = c->burst_limit;
+}
+
+/* Finds and returns the ofservice within 'mgr' that has the given
+ * 'target', or a null pointer if none exists. */
+static struct ofservice *
+ofservice_lookup(struct connmgr *mgr, const char *target)
+{
+    struct ofservice *ofservice;
+
+    HMAP_FOR_EACH_WITH_HASH (ofservice, node, hash_string(target, 0),
+                             &mgr->services) {
+        if (!strcmp(pvconn_get_name(ofservice->pvconn), target)) {
+            return ofservice;
+        }
+    }
+    return NULL;
+}
diff --git a/ofproto/connmgr.h b/ofproto/connmgr.h
new file mode 100644 (file)
index 0000000..4710e6d
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2009, 2010, 2011 Nicira Networks.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef CONNMGR_H
+#define CONNMGR_H 1
+
+#include "hmap.h"
+#include "list.h"
+#include "ofproto.h"
+#include "openflow/nicira-ext.h"
+#include "openvswitch/types.h"
+
+struct dpif_upcall;
+struct ofconn;
+struct ofputil_flow_removed;
+
+/* ofproto supports two kinds of OpenFlow connections:
+ *
+ *   - "Primary" connections to ordinary OpenFlow controllers.  ofproto
+ *     maintains persistent connections to these controllers and by default
+ *     sends them asynchronous messages such as packet-ins.
+ *
+ *   - "Service" connections, e.g. from ovs-ofctl.  When these connections
+ *     drop, it is the other side's responsibility to reconnect them if
+ *     necessary.  ofproto does not send them asynchronous messages by default.
+ *
+ * Currently, active (tcp, ssl, unix) connections are always "primary"
+ * connections and passive (ptcp, pssl, punix) connections are always "service"
+ * connections.  There is no inherent reason for this, but it reflects the
+ * common case.
+ */
+enum ofconn_type {
+    OFCONN_PRIMARY,             /* An ordinary OpenFlow controller. */
+    OFCONN_SERVICE              /* A service connection, e.g. "ovs-ofctl". */
+};
+
+/* Basics. */
+struct connmgr *connmgr_create(struct ofproto *ofproto,
+                               const char *dpif_name, const char *local_name);
+void connmgr_destroy(struct connmgr *);
+
+void connmgr_run(struct connmgr *,
+                 void (*handle_openflow)(struct ofconn *,
+                                         struct ofpbuf *ofp_msg));
+void connmgr_wait(struct connmgr *);
+
+struct ofproto *ofconn_get_ofproto(const struct ofconn *);
+
+/* OpenFlow configuration. */
+bool connmgr_has_controllers(const struct connmgr *);
+void connmgr_get_controller_info(struct connmgr *, struct shash *);
+void connmgr_set_controllers(struct connmgr *,
+                             const struct ofproto_controller[], size_t n);
+void connmgr_reconnect(const struct connmgr *);
+
+int connmgr_set_snoops(struct connmgr *, const struct svec *snoops);
+void connmgr_get_snoops(const struct connmgr *, struct svec *snoops);
+
+/* Individual connections to OpenFlow controllers. */
+enum ofconn_type ofconn_get_type(const struct ofconn *);
+
+enum nx_role ofconn_get_role(const struct ofconn *);
+void ofconn_set_role(struct ofconn *, enum nx_role);
+
+enum nx_flow_format ofconn_get_flow_format(struct ofconn *);
+void ofconn_set_flow_format(struct ofconn *, enum nx_flow_format);
+
+int ofconn_get_miss_send_len(const struct ofconn *);
+void ofconn_set_miss_send_len(struct ofconn *, int miss_send_len);
+
+void ofconn_send_reply(const struct ofconn *, struct ofpbuf *);
+
+int ofconn_pktbuf_retrieve(struct ofconn *, uint32_t id,
+                           struct ofpbuf **bufferp, uint16_t *in_port);
+
+/* Sending asynchronous messages. */
+void connmgr_send_port_status(struct connmgr *, const struct ofp_phy_port *,
+                              uint8_t reason);
+void connmgr_send_flow_removed(struct connmgr *,
+                               const struct ofputil_flow_removed *);
+void connmgr_send_packet_in(struct connmgr *, const struct dpif_upcall *,
+                            const struct flow *, struct ofpbuf *rw_packet);
+
+/* Fail-open settings. */
+enum ofproto_fail_mode connmgr_get_fail_mode(const struct connmgr *);
+void connmgr_set_fail_mode(struct connmgr *, enum ofproto_fail_mode);
+
+/* Fail-open implementation. */
+int connmgr_get_max_probe_interval(const struct connmgr *);
+bool connmgr_is_any_controller_connected(const struct connmgr *);
+bool connmgr_is_any_controller_admitted(const struct connmgr *);
+int connmgr_failure_duration(const struct connmgr *);
+void connmgr_broadcast(struct connmgr *, struct ofpbuf *);
+
+/* In-band configuration. */
+void connmgr_set_extra_in_band_remotes(struct connmgr *,
+                                       const struct sockaddr_in *, size_t);
+void connmgr_set_in_band_queue(struct connmgr *, int queue_id);
+
+/* In-band implementation. */
+bool connmgr_msg_in_hook(struct connmgr *, const struct flow *,
+                         const struct ofpbuf *packet);
+bool connmgr_may_set_up_flow(struct connmgr *, const struct flow *,
+                             const struct nlattr *odp_actions,
+                             size_t actions_len);
+
+/* Fail-open and in-band implementation. */
+void connmgr_flushed(struct connmgr *);
+
+#endif /* connmgr.h */
index 97fee3e..bf57b9f 100644 (file)
@@ -19,6 +19,7 @@
 #include <inttypes.h>
 #include <stdlib.h>
 #include "classifier.h"
+#include "connmgr.h"
 #include "flow.h"
 #include "mac-learning.h"
 #include "odp-util.h"
@@ -69,8 +70,7 @@ VLOG_DEFINE_THIS_MODULE(fail_open);
 
 struct fail_open {
     struct ofproto *ofproto;
-    struct rconn **controllers;
-    size_t n_controllers;
+    struct connmgr *connmgr;
     int last_disconn_secs;
     long long int next_bogus_packet_in;
     struct rconn_packet_counter *bogus_packet_counter;
@@ -83,7 +83,7 @@ static void fail_open_recover(struct fail_open *);
 static int
 trigger_duration(const struct fail_open *fo)
 {
-    if (!fo->n_controllers) {
+    if (!connmgr_has_controllers(fo->connmgr)) {
         /* Shouldn't ever arrive here, but if we do, never fail open. */
         return INT_MAX;
     } else {
@@ -100,36 +100,8 @@ trigger_duration(const struct fail_open *fo)
          *  - The third interval is the time allowed to reconnect after no
          *    response is received.
          */
-        int max_probe_interval;
-        size_t i;
-
-        max_probe_interval = 0;
-        for (i = 0; i < fo->n_controllers; i++) {
-            int probe_interval = rconn_get_probe_interval(fo->controllers[i]);
-            max_probe_interval = MAX(max_probe_interval, probe_interval);
-        }
-        return max_probe_interval * 3;
-    }
-}
-
-/* Returns the number of seconds for which all controllers have been
- * disconnected.  */
-static int
-failure_duration(const struct fail_open *fo)
-{
-    int min_failure_duration;
-    size_t i;
-
-    if (!fo->n_controllers) {
-        return 0;
-    }
-
-    min_failure_duration = INT_MAX;
-    for (i = 0; i < fo->n_controllers; i++) {
-        int failure_duration = rconn_failure_duration(fo->controllers[i]);
-        min_failure_duration = MIN(min_failure_duration, failure_duration);
+        return connmgr_get_max_probe_interval(fo->connmgr) * 3;
     }
-    return min_failure_duration;
 }
 
 /* Returns true if 'fo' is currently in fail-open mode, otherwise false. */
@@ -139,39 +111,8 @@ fail_open_is_active(const struct fail_open *fo)
     return fo->last_disconn_secs != 0;
 }
 
-/* Returns true if at least one controller is connected (regardless of whether
- * those controllers are believed to have authenticated and accepted this
- * switch), false if none of them are connected. */
-static bool
-any_controller_is_connected(const struct fail_open *fo)
-{
-    size_t i;
-
-    for (i = 0; i < fo->n_controllers; i++) {
-        if (rconn_is_connected(fo->controllers[i])) {
-            return true;
-        }
-    }
-    return false;
-}
-
-/* Returns true if at least one controller is believed to have authenticated
- * and accepted this switch, false otherwise. */
-static bool
-any_controller_is_admitted(const struct fail_open *fo)
-{
-    size_t i;
-
-    for (i = 0; i < fo->n_controllers; i++) {
-        if (rconn_is_admitted(fo->controllers[i])) {
-            return true;
-        }
-    }
-    return false;
-}
-
 static void
-send_bogus_packet_in(struct fail_open *fo, struct rconn *rconn)
+send_bogus_packet_ins(struct fail_open *fo)
 {
     uint8_t mac[ETH_ADDR_LEN];
     struct ofpbuf *opi;
@@ -185,26 +126,14 @@ send_bogus_packet_in(struct fail_open *fo, struct rconn *rconn)
     ofpbuf_uninit(&b);
 
     /* Send. */
-    rconn_send_with_limit(rconn, opi, fo->bogus_packet_counter, 1);
-}
-
-static void
-send_bogus_packet_ins(struct fail_open *fo)
-{
-    size_t i;
-
-    for (i = 0; i < fo->n_controllers; i++) {
-        if (rconn_is_connected(fo->controllers[i])) {
-            send_bogus_packet_in(fo, fo->controllers[i]);
-        }
-    }
+    connmgr_broadcast(fo->connmgr, opi);
 }
 
 /* Enter fail-open mode if we should be in it. */
 void
 fail_open_run(struct fail_open *fo)
 {
-    int disconn_secs = failure_duration(fo);
+    int disconn_secs = connmgr_failure_duration(fo->connmgr);
 
     /* Enter fail-open mode if 'fo' is not in it but should be.  */
     if (disconn_secs >= trigger_duration(fo)) {
@@ -227,7 +156,7 @@ fail_open_run(struct fail_open *fo)
 
     /* Schedule a bogus packet-in if we're connected and in fail-open. */
     if (fail_open_is_active(fo)) {
-        if (any_controller_is_connected(fo)) {
+        if (connmgr_is_any_controller_connected(fo->connmgr)) {
             bool expired = time_msec() >= fo->next_bogus_packet_in;
             if (expired) {
                 send_bogus_packet_ins(fo);
@@ -247,7 +176,8 @@ fail_open_run(struct fail_open *fo)
 void
 fail_open_maybe_recover(struct fail_open *fo)
 {
-    if (any_controller_is_admitted(fo)) {
+    if (fail_open_is_active(fo)
+        && connmgr_is_any_controller_admitted(fo->connmgr)) {
         fail_open_recover(fo);
     }
 }
@@ -255,16 +185,14 @@ fail_open_maybe_recover(struct fail_open *fo)
 static void
 fail_open_recover(struct fail_open *fo)
 {
-    if (fail_open_is_active(fo)) {
-        struct cls_rule rule;
+    struct cls_rule rule;
 
-        VLOG_WARN("No longer in fail-open mode");
-        fo->last_disconn_secs = 0;
-        fo->next_bogus_packet_in = LLONG_MAX;
+    VLOG_WARN("No longer in fail-open mode");
+    fo->last_disconn_secs = 0;
+    fo->next_bogus_packet_in = LLONG_MAX;
 
-        cls_rule_init_catchall(&rule, FAIL_OPEN_PRIORITY);
-        ofproto_delete_flow(fo->ofproto, &rule);
-    }
+    cls_rule_init_catchall(&rule, FAIL_OPEN_PRIORITY);
+    ofproto_delete_flow(fo->ofproto, &rule);
 }
 
 void
@@ -278,7 +206,7 @@ fail_open_wait(struct fail_open *fo)
 void
 fail_open_flushed(struct fail_open *fo)
 {
-    int disconn_secs = failure_duration(fo);
+    int disconn_secs = connmgr_failure_duration(fo->connmgr);
     bool open = disconn_secs >= trigger_duration(fo);
     if (open) {
         union ofp_action action;
@@ -296,47 +224,28 @@ fail_open_flushed(struct fail_open *fo)
     }
 }
 
-/* Creates and returns a new struct fail_open for 'ofproto'.
- *
- * The caller should register its set of controllers with
- * fail_open_set_controllers().  (There should be at least one controller,
- * otherwise there isn't any point in having the struct fail_open around.) */
+/* Creates and returns a new struct fail_open for 'ofproto' and 'mgr'. */
 struct fail_open *
-fail_open_create(struct ofproto *ofproto)
+fail_open_create(struct ofproto *ofproto, struct connmgr *mgr)
 {
     struct fail_open *fo = xmalloc(sizeof *fo);
     fo->ofproto = ofproto;
-    fo->controllers = NULL;
-    fo->n_controllers = 0;
+    fo->connmgr = mgr;
     fo->last_disconn_secs = 0;
     fo->next_bogus_packet_in = LLONG_MAX;
     fo->bogus_packet_counter = rconn_packet_counter_create();
     return fo;
 }
 
-/* Registers the 'n' rconns in 'rconns' as connections to the controller for
- * 'fo'.  The caller must ensure that all of the rconns remain valid until 'fo'
- * is destroyed or a new set is registered in a subsequent call.
- *
- * Takes ownership of the 'rconns' array, but not of the rconns that it points
- * to (of which the caller retains ownership). */
-void
-fail_open_set_controllers(struct fail_open *fo,
-                          struct rconn **rconns, size_t n)
-{
-    free(fo->controllers);
-    fo->controllers = rconns;
-    fo->n_controllers = n;
-}
-
 /* Destroys 'fo'. */
 void
 fail_open_destroy(struct fail_open *fo)
 {
     if (fo) {
-        fail_open_recover(fo);
-        free(fo->controllers);
-        /* We don't own the rconns behind fo->controllers. */
+        if (fail_open_is_active(fo)) {
+            fail_open_recover(fo);
+        }
+        /* We don't own fo->connmgr. */
         rconn_packet_counter_destroy(fo->bogus_packet_counter);
         free(fo);
     }
index 2d0424d..51fa0b9 100644 (file)
@@ -21,9 +21,9 @@
 #include <stdint.h>
 #include "flow.h"
 
+struct connmgr;
 struct fail_open;
 struct ofproto;
-struct rconn;
 
 /* Priority of the rule added by the fail-open subsystem when a switch enters
  * fail-open mode.  This priority value uniquely identifies a fail-open flow
@@ -31,8 +31,7 @@ struct rconn;
  * creates flows with this priority).  And "f0" is mnemonic for "fail open"! */
 #define FAIL_OPEN_PRIORITY 0xf0f0f0
 
-struct fail_open *fail_open_create(struct ofproto *);
-void fail_open_set_controllers(struct fail_open *, struct rconn **, size_t n);
+struct fail_open *fail_open_create(struct ofproto *, struct connmgr *);
 void fail_open_destroy(struct fail_open *);
 void fail_open_wait(struct fail_open *);
 bool fail_open_is_active(const struct fail_open *);
index a001baf..e75d19e 100644 (file)
@@ -377,10 +377,6 @@ bool
 in_band_msg_in_hook(struct in_band *in_band, const struct flow *flow,
                     const struct ofpbuf *packet)
 {
-    if (!in_band) {
-        return false;
-    }
-
     /* Regardless of how the flow table is configured, we want to be
      * able to see replies to our DHCP requests. */
     if (flow->dl_type == htons(ETH_TYPE_IP)
@@ -409,13 +405,9 @@ in_band_msg_in_hook(struct in_band *in_band, const struct flow *flow,
 /* Returns true if the rule that would match 'flow' with 'actions' is
  * allowed to be set up in the datapath. */
 bool
-in_band_rule_check(struct in_band *in_band, const struct flow *flow,
+in_band_rule_check(const struct flow *flow,
                    const struct nlattr *actions, size_t actions_len)
 {
-    if (!in_band) {
-        return true;
-    }
-
     /* Don't allow flows that would prevent DHCP replies from being seen
      * by the local port. */
     if (flow->dl_type == htons(ETH_TYPE_IP)
@@ -684,23 +676,14 @@ in_band_flushed(struct in_band *in_band)
 }
 
 int
-in_band_create(struct ofproto *ofproto, struct dpif *dpif,
+in_band_create(struct ofproto *ofproto, const char *local_name,
                struct in_band **in_bandp)
 {
     struct in_band *in_band;
-    char local_name[IF_NAMESIZE];
     struct netdev *local_netdev;
     int error;
 
     *in_bandp = NULL;
-    error = dpif_port_get_name(dpif, ODPP_LOCAL,
-                               local_name, sizeof local_name);
-    if (error) {
-        VLOG_ERR("failed to initialize in-band control: cannot get name "
-                 "of datapath local port (%s)", strerror(error));
-        return error;
-    }
-
     error = netdev_open_default(local_name, &local_netdev);
     if (error) {
         VLOG_ERR("failed to initialize in-band control: cannot open "
index 701da00..5fa3666 100644 (file)
 #ifndef IN_BAND_H
 #define IN_BAND_H 1
 
-#include "flow.h"
+#include <stdbool.h>
+#include <stddef.h>
+#include <sys/socket.h>
 
-struct dpif;
+struct flow;
 struct in_band;
+struct nlattr;
+struct ofpbuf;
 struct ofproto;
-struct rconn;
-struct settings;
 
-int in_band_create(struct ofproto *, struct dpif *, struct in_band **);
+int in_band_create(struct ofproto *, const char *local_name,
+                   struct in_band **);
 void in_band_destroy(struct in_band *);
 
 void in_band_set_queue(struct in_band *, int queue_id);
@@ -37,9 +40,8 @@ void in_band_wait(struct in_band *);
 
 bool in_band_msg_in_hook(struct in_band *, const struct flow *,
                          const struct ofpbuf *packet);
-bool in_band_rule_check(struct in_band *, const struct flow *,
-                        const struct nlattr *odp_actions,
-                        size_t actions_len);
+bool in_band_rule_check(const struct flow *,
+                        const struct nlattr *odp_actions, size_t actions_len);
 void in_band_flushed(struct in_band *);
 
 #endif /* in-band.h */
index 25a9db6..ce8c99b 100644 (file)
@@ -27,6 +27,7 @@
 #include "byte-order.h"
 #include "cfm.h"
 #include "classifier.h"
+#include "connmgr.h"
 #include "coverage.h"
 #include "dpif.h"
 #include "dynamic-string.h"
@@ -79,7 +80,6 @@ COVERAGE_DEFINE(ofproto_flows_req);
 COVERAGE_DEFINE(ofproto_flush);
 COVERAGE_DEFINE(ofproto_invalidated);
 COVERAGE_DEFINE(ofproto_no_packet_in);
-COVERAGE_DEFINE(ofproto_ofconn_stuck);
 COVERAGE_DEFINE(ofproto_ofp2odp);
 COVERAGE_DEFINE(ofproto_packet_in);
 COVERAGE_DEFINE(ofproto_packet_out);
@@ -270,107 +270,8 @@ static void facet_update_stats(struct ofproto *, struct facet *,
                                const struct dpif_flow_stats *);
 static void facet_push_stats(struct ofproto *, struct facet *);
 
-/* ofproto supports two kinds of OpenFlow connections:
- *
- *   - "Primary" connections to ordinary OpenFlow controllers.  ofproto
- *     maintains persistent connections to these controllers and by default
- *     sends them asynchronous messages such as packet-ins.
- *
- *   - "Service" connections, e.g. from ovs-ofctl.  When these connections
- *     drop, it is the other side's responsibility to reconnect them if
- *     necessary.  ofproto does not send them asynchronous messages by default.
- *
- * Currently, active (tcp, ssl, unix) connections are always "primary"
- * connections and passive (ptcp, pssl, punix) connections are always "service"
- * connections.  There is no inherent reason for this, but it reflects the
- * common case.
- */
-enum ofconn_type {
-    OFCONN_PRIMARY,             /* An ordinary OpenFlow controller. */
-    OFCONN_SERVICE              /* A service connection, e.g. "ovs-ofctl". */
-};
-
-/* A listener for incoming OpenFlow "service" connections. */
-struct ofservice {
-    struct hmap_node node;      /* In struct ofproto's "services" hmap. */
-    struct pvconn *pvconn;      /* OpenFlow connection listener. */
-
-    /* These are not used by ofservice directly.  They are settings for
-     * accepted "struct ofconn"s from the pvconn. */
-    int probe_interval;         /* Max idle time before probing, in seconds. */
-    int rate_limit;             /* Max packet-in rate in packets per second. */
-    int burst_limit;            /* Limit on accumulating packet credits. */
-};
-
-static struct ofservice *ofservice_lookup(struct ofproto *,
-                                          const char *target);
-static int ofservice_create(struct ofproto *,
-                            const struct ofproto_controller *);
-static void ofservice_reconfigure(struct ofservice *,
-                                  const struct ofproto_controller *);
-static void ofservice_destroy(struct ofproto *, struct ofservice *);
-
-/* An OpenFlow connection. */
-struct ofconn {
-    struct ofproto *ofproto;    /* The ofproto that owns this connection. */
-    struct list node;           /* In struct ofproto's "all_conns" list. */
-    struct rconn *rconn;        /* OpenFlow connection. */
-    enum ofconn_type type;      /* Type. */
-    enum nx_flow_format flow_format; /* Currently selected flow format. */
-
-    /* OFPT_PACKET_IN related data. */
-    struct rconn_packet_counter *packet_in_counter; /* # queued on 'rconn'. */
-#define N_SCHEDULERS 2
-    struct pinsched *schedulers[N_SCHEDULERS];
-    struct pktbuf *pktbuf;         /* OpenFlow packet buffers. */
-    int miss_send_len;             /* Bytes to send of buffered packets. */
-
-    /* Number of OpenFlow messages queued on 'rconn' as replies to OpenFlow
-     * requests, and the maximum number before we stop reading OpenFlow
-     * requests.  */
-#define OFCONN_REPLY_MAX 100
-    struct rconn_packet_counter *reply_counter;
-
-    /* type == OFCONN_PRIMARY only. */
-    enum nx_role role;           /* Role. */
-    struct hmap_node hmap_node;  /* In struct ofproto's "controllers" map. */
-    enum ofproto_band band;      /* In-band or out-of-band? */
-};
-
-
-static struct ofconn *ofconn_create(struct ofproto *, struct rconn *,
-                                    enum ofconn_type);
-static void ofconn_destroy(struct ofconn *);
-static void ofconn_run(struct ofconn *);
-static void ofconn_wait(struct ofconn *);
-
-static bool ofconn_receives_async_msgs(const struct ofconn *);
-static char *ofconn_make_name(const struct ofproto *, const char *target);
-static void ofconn_set_rate_limit(struct ofconn *, int rate, int burst);
-
-static struct ofproto *ofconn_get_ofproto(struct ofconn *);
-
-static enum nx_flow_format ofconn_get_flow_format(struct ofconn *);
-static void ofconn_set_flow_format(struct ofconn *, enum nx_flow_format);
-
-static int ofconn_get_miss_send_len(const struct ofconn *);
-static void ofconn_set_miss_send_len(struct ofconn *, int miss_send_len);
-
-static enum ofconn_type ofconn_get_type(const struct ofconn *);
-
-static enum nx_role ofconn_get_role(const struct ofconn *);
-static void ofconn_set_role(struct ofconn *, enum nx_role);
-
-static int ofconn_pktbuf_retrieve(struct ofconn *, uint32_t id,
-                                  struct ofpbuf **bufferp, uint16_t *in_port);
-
-
-static void queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
-                     struct rconn_packet_counter *counter);
-
 static void send_packet_in(struct ofproto *, struct dpif_upcall *,
                            const struct flow *, bool clone);
-static void do_send_packet_in(struct ofpbuf *ofp_packet_in, void *ofconn);
 
 struct ofproto {
     /* Settings. */
@@ -390,17 +291,9 @@ struct ofproto {
     uint32_t max_ports;
 
     /* Configuration. */
-    struct fail_open *fail_open;
     struct netflow *netflow;
     struct ofproto_sflow *sflow;
 
-    /* In-band control. */
-    struct in_band *in_band;
-    long long int next_in_band_update;
-    struct sockaddr_in *extra_in_band_remotes;
-    size_t n_extra_remotes;
-    int in_band_queue;
-
     /* Flow table. */
     struct classifier cls;
     long long int next_expiration;
@@ -411,14 +304,7 @@ struct ofproto {
     struct tag_set revalidate_set;
 
     /* OpenFlow connections. */
-    struct hmap controllers;   /* Controller "struct ofconn"s. */
-    struct list all_conns;     /* Contains "struct ofconn"s. */
-    enum ofproto_fail_mode fail_mode;
-
-    /* OpenFlow listeners. */
-    struct hmap services;       /* Contains "struct ofservice"s. */
-    struct pvconn **snoops;
-    size_t n_snoops;
+    struct connmgr *connmgr;
 
     /* Hooks for ovs-vswitchd. */
     const struct ofhooks *ofhooks;
@@ -459,6 +345,7 @@ ofproto_create(const char *datapath, const char *datapath_type,
                const struct ofhooks *ofhooks, void *aux,
                struct ofproto **ofprotop)
 {
+    char local_name[IF_NAMESIZE];
     struct ofproto *p;
     struct dpif *dpif;
     int error;
@@ -486,6 +373,14 @@ ofproto_create(const char *datapath, const char *datapath_type,
     dpif_flow_flush(dpif);
     dpif_recv_purge(dpif);
 
+    error = dpif_port_get_name(dpif, ODPP_LOCAL,
+                               local_name, sizeof local_name);
+    if (error) {
+        VLOG_ERR("%s: cannot get name of datapath local port (%s)",
+                 datapath, strerror(error));
+        return error;
+    }
+
     /* Initialize settings. */
     p = xzalloc(sizeof *p);
     p->fallback_dpid = pick_fallback_dpid();
@@ -504,14 +399,9 @@ ofproto_create(const char *datapath, const char *datapath_type,
     p->max_ports = dpif_get_max_ports(dpif);
 
     /* Initialize submodules. */
-    p->fail_open = NULL;
     p->netflow = NULL;
     p->sflow = NULL;
 
-    /* Initialize in-band control. */
-    p->in_band = NULL;
-    p->in_band_queue = -1;
-
     /* Initialize flow table. */
     classifier_init(&p->cls);
     p->next_expiration = time_msec() + 1000;
@@ -521,13 +411,6 @@ ofproto_create(const char *datapath, const char *datapath_type,
     p->need_revalidate = false;
     tag_set_init(&p->revalidate_set);
 
-    /* Initialize OpenFlow connections. */
-    list_init(&p->all_conns);
-    hmap_init(&p->controllers);
-    hmap_init(&p->services);
-    p->snoops = NULL;
-    p->n_snoops = 0;
-
     /* Initialize hooks. */
     if (ofhooks) {
         p->ofhooks = ofhooks;
@@ -545,6 +428,9 @@ ofproto_create(const char *datapath, const char *datapath_type,
 
     shash_add_once(&all_ofprotos, dpif_name(p->dpif), p);
 
+    /* Initialize OpenFlow connections. */
+    p->connmgr = connmgr_create(p, datapath, local_name);
+
     *ofprotop = p;
     return 0;
 }
@@ -563,212 +449,18 @@ ofproto_set_datapath_id(struct ofproto *p, uint64_t datapath_id)
     }
 }
 
-/* Creates a new controller in 'ofproto'.  Some of the settings are initially
- * drawn from 'c', but update_controller() needs to be called later to finish
- * the new ofconn's configuration. */
-static void
-add_controller(struct ofproto *ofproto, const struct ofproto_controller *c)
-{
-    char *name = ofconn_make_name(ofproto, c->target);
-    struct ofconn *ofconn;
-
-    ofconn = ofconn_create(ofproto, rconn_create(5, 8), OFCONN_PRIMARY);
-    ofconn->pktbuf = pktbuf_create();
-    ofconn->miss_send_len = OFP_DEFAULT_MISS_SEND_LEN;
-    rconn_connect(ofconn->rconn, c->target, name);
-    hmap_insert(&ofproto->controllers, &ofconn->hmap_node,
-                hash_string(c->target, 0));
-
-    free(name);
-}
-
-/* Reconfigures 'ofconn' to match 'c'.  This function cannot update an ofconn's
- * target (this is done by creating new ofconns and deleting old ones), but it
- * can update the rest of an ofconn's settings. */
-static void
-update_controller(struct ofconn *ofconn, const struct ofproto_controller *c)
-{
-    int probe_interval;
-
-    ofconn->band = c->band;
-
-    rconn_set_max_backoff(ofconn->rconn, c->max_backoff);
-
-    probe_interval = c->probe_interval ? MAX(c->probe_interval, 5) : 0;
-    rconn_set_probe_interval(ofconn->rconn, probe_interval);
-
-    ofconn_set_rate_limit(ofconn, c->rate_limit, c->burst_limit);
-}
-
-static const char *
-ofconn_get_target(const struct ofconn *ofconn)
-{
-    return rconn_get_target(ofconn->rconn);
-}
-
-static struct ofconn *
-find_controller_by_target(struct ofproto *ofproto, const char *target)
-{
-    struct ofconn *ofconn;
-
-    HMAP_FOR_EACH_WITH_HASH (ofconn, hmap_node,
-                             hash_string(target, 0), &ofproto->controllers) {
-        if (!strcmp(ofconn_get_target(ofconn), target)) {
-            return ofconn;
-        }
-    }
-    return NULL;
-}
-
-static void
-update_in_band_remotes(struct ofproto *ofproto)
-{
-    const struct ofconn *ofconn;
-    struct sockaddr_in *addrs;
-    size_t max_addrs, n_addrs;
-    size_t i;
-
-    /* Allocate enough memory for as many remotes as we could possibly have. */
-    max_addrs = ofproto->n_extra_remotes + hmap_count(&ofproto->controllers);
-    addrs = xmalloc(max_addrs * sizeof *addrs);
-    n_addrs = 0;
-
-    /* Add all the remotes. */
-    HMAP_FOR_EACH (ofconn, hmap_node, &ofproto->controllers) {
-        struct sockaddr_in *sin = &addrs[n_addrs];
-
-        if (ofconn->band == OFPROTO_OUT_OF_BAND) {
-            continue;
-        }
-
-        sin->sin_addr.s_addr = rconn_get_remote_ip(ofconn->rconn);
-        if (sin->sin_addr.s_addr) {
-            sin->sin_port = rconn_get_remote_port(ofconn->rconn);
-            n_addrs++;
-        }
-    }
-    for (i = 0; i < ofproto->n_extra_remotes; i++) {
-        addrs[n_addrs++] = ofproto->extra_in_band_remotes[i];
-    }
-
-    /* Create or update or destroy in-band. */
-    if (n_addrs) {
-        if (!ofproto->in_band) {
-            in_band_create(ofproto, ofproto->dpif, &ofproto->in_band);
-        }
-        if (ofproto->in_band) {
-            in_band_set_remotes(ofproto->in_band, addrs, n_addrs);
-        }
-        in_band_set_queue(ofproto->in_band, ofproto->in_band_queue);
-        ofproto->next_in_band_update = time_msec() + 1000;
-    } else {
-        in_band_destroy(ofproto->in_band);
-        ofproto->in_band = NULL;
-    }
-
-    /* Clean up. */
-    free(addrs);
-}
-
-static void
-update_fail_open(struct ofproto *p)
-{
-    struct ofconn *ofconn;
-
-    if (!hmap_is_empty(&p->controllers)
-            && p->fail_mode == OFPROTO_FAIL_STANDALONE) {
-        struct rconn **rconns;
-        size_t n;
-
-        if (!p->fail_open) {
-            p->fail_open = fail_open_create(p);
-        }
-
-        n = 0;
-        rconns = xmalloc(hmap_count(&p->controllers) * sizeof *rconns);
-        HMAP_FOR_EACH (ofconn, hmap_node, &p->controllers) {
-            rconns[n++] = ofconn->rconn;
-        }
-
-        fail_open_set_controllers(p->fail_open, rconns, n);
-        /* p->fail_open takes ownership of 'rconns'. */
-    } else {
-        fail_open_destroy(p->fail_open);
-        p->fail_open = NULL;
-    }
-}
-
 void
 ofproto_set_controllers(struct ofproto *p,
                         const struct ofproto_controller *controllers,
                         size_t n_controllers)
 {
-    struct shash new_controllers;
-    struct ofconn *ofconn, *next_ofconn;
-    struct ofservice *ofservice, *next_ofservice;
-    size_t i;
-
-    /* Create newly configured controllers and services.
-     * Create a name to ofproto_controller mapping in 'new_controllers'. */
-    shash_init(&new_controllers);
-    for (i = 0; i < n_controllers; i++) {
-        const struct ofproto_controller *c = &controllers[i];
-
-        if (!vconn_verify_name(c->target)) {
-            if (!find_controller_by_target(p, c->target)) {
-                add_controller(p, c);
-            }
-        } else if (!pvconn_verify_name(c->target)) {
-            if (!ofservice_lookup(p, c->target) && ofservice_create(p, c)) {
-                continue;
-            }
-        } else {
-            VLOG_WARN_RL(&rl, "%s: unsupported controller \"%s\"",
-                         dpif_name(p->dpif), c->target);
-            continue;
-        }
-
-        shash_add_once(&new_controllers, c->target, &controllers[i]);
-    }
-
-    /* Delete controllers that are no longer configured.
-     * Update configuration of all now-existing controllers. */
-    HMAP_FOR_EACH_SAFE (ofconn, next_ofconn, hmap_node, &p->controllers) {
-        struct ofproto_controller *c;
-
-        c = shash_find_data(&new_controllers, ofconn_get_target(ofconn));
-        if (!c) {
-            ofconn_destroy(ofconn);
-        } else {
-            update_controller(ofconn, c);
-        }
-    }
-
-    /* Delete services that are no longer configured.
-     * Update configuration of all now-existing services. */
-    HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &p->services) {
-        struct ofproto_controller *c;
-
-        c = shash_find_data(&new_controllers,
-                            pvconn_get_name(ofservice->pvconn));
-        if (!c) {
-            ofservice_destroy(p, ofservice);
-        } else {
-            ofservice_reconfigure(ofservice, c);
-        }
-    }
-
-    shash_destroy(&new_controllers);
-
-    update_in_band_remotes(p);
-    update_fail_open(p);
+    connmgr_set_controllers(p->connmgr, controllers, n_controllers);
 }
 
 void
 ofproto_set_fail_mode(struct ofproto *p, enum ofproto_fail_mode fail_mode)
 {
-    p->fail_mode = fail_mode;
-    update_fail_open(p);
+    connmgr_set_fail_mode(p->connmgr, fail_mode);
 }
 
 /* Drops the connections between 'ofproto' and all of its controllers, forcing
@@ -776,34 +468,7 @@ ofproto_set_fail_mode(struct ofproto *p, enum ofproto_fail_mode fail_mode)
 void
 ofproto_reconnect_controllers(struct ofproto *ofproto)
 {
-    struct ofconn *ofconn;
-
-    LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
-        rconn_reconnect(ofconn->rconn);
-    }
-}
-
-static bool
-any_extras_changed(const struct ofproto *ofproto,
-                   const struct sockaddr_in *extras, size_t n)
-{
-    size_t i;
-
-    if (n != ofproto->n_extra_remotes) {
-        return true;
-    }
-
-    for (i = 0; i < n; i++) {
-        const struct sockaddr_in *old = &ofproto->extra_in_band_remotes[i];
-        const struct sockaddr_in *new = &extras[i];
-
-        if (old->sin_addr.s_addr != new->sin_addr.s_addr ||
-            old->sin_port != new->sin_port) {
-            return true;
-        }
-    }
-
-    return false;
+    connmgr_reconnect(ofproto->connmgr);
 }
 
 /* Sets the 'n' TCP port addresses in 'extras' as ones to which 'ofproto''s
@@ -813,15 +478,7 @@ void
 ofproto_set_extra_in_band_remotes(struct ofproto *ofproto,
                                   const struct sockaddr_in *extras, size_t n)
 {
-    if (!any_extras_changed(ofproto, extras, n)) {
-        return;
-    }
-
-    free(ofproto->extra_in_band_remotes);
-    ofproto->n_extra_remotes = n;
-    ofproto->extra_in_band_remotes = xmemdup(extras, n * sizeof *extras);
-
-    update_in_band_remotes(ofproto);
+    connmgr_set_extra_in_band_remotes(ofproto->connmgr, extras, n);
 }
 
 /* Sets the OpenFlow queue used by flows set up by in-band control on
@@ -830,10 +487,7 @@ ofproto_set_extra_in_band_remotes(struct ofproto *ofproto,
 void
 ofproto_set_in_band_queue(struct ofproto *ofproto, int queue_id)
 {
-    if (queue_id != ofproto->in_band_queue) {
-        ofproto->in_band_queue = queue_id;
-        update_in_band_remotes(ofproto);
-    }
+    connmgr_set_in_band_queue(ofproto->connmgr, queue_id);
 }
 
 void
@@ -887,48 +541,10 @@ ofproto_set_desc(struct ofproto *p,
     }
 }
 
-static int
-set_pvconns(struct pvconn ***pvconnsp, size_t *n_pvconnsp,
-            const struct svec *svec)
-{
-    struct pvconn **pvconns = *pvconnsp;
-    size_t n_pvconns = *n_pvconnsp;
-    int retval = 0;
-    size_t i;
-
-    for (i = 0; i < n_pvconns; i++) {
-        pvconn_close(pvconns[i]);
-    }
-    free(pvconns);
-
-    pvconns = xmalloc(svec->n * sizeof *pvconns);
-    n_pvconns = 0;
-    for (i = 0; i < svec->n; i++) {
-        const char *name = svec->names[i];
-        struct pvconn *pvconn;
-        int error;
-
-        error = pvconn_open(name, &pvconn);
-        if (!error) {
-            pvconns[n_pvconns++] = pvconn;
-        } else {
-            VLOG_ERR("failed to listen on %s: %s", name, strerror(error));
-            if (!retval) {
-                retval = error;
-            }
-        }
-    }
-
-    *pvconnsp = pvconns;
-    *n_pvconnsp = n_pvconns;
-
-    return retval;
-}
-
 int
 ofproto_set_snoops(struct ofproto *ofproto, const struct svec *snoops)
 {
-    return set_pvconns(&ofproto->snoops, &ofproto->n_snoops, snoops);
+    return connmgr_set_snoops(ofproto->connmgr, snoops);
 }
 
 int
@@ -1041,32 +657,25 @@ ofproto_get_datapath_id(const struct ofproto *ofproto)
 bool
 ofproto_has_primary_controller(const struct ofproto *ofproto)
 {
-    return !hmap_is_empty(&ofproto->controllers);
+    return connmgr_has_controllers(ofproto->connmgr);
 }
 
 enum ofproto_fail_mode
 ofproto_get_fail_mode(const struct ofproto *p)
 {
-    return p->fail_mode;
+    return connmgr_get_fail_mode(p->connmgr);
 }
 
 void
 ofproto_get_snoops(const struct ofproto *ofproto, struct svec *snoops)
 {
-    size_t i;
-
-    for (i = 0; i < ofproto->n_snoops; i++) {
-        svec_add(snoops, pvconn_get_name(ofproto->snoops[i]));
-    }
+    connmgr_get_snoops(ofproto->connmgr, snoops);
 }
 
 void
 ofproto_destroy(struct ofproto *p)
 {
-    struct ofservice *ofservice, *next_ofservice;
-    struct ofconn *ofconn, *next_ofconn;
     struct ofport *ofport, *next_ofport;
-    size_t i;
 
     if (!p) {
         return;
@@ -1074,23 +683,13 @@ ofproto_destroy(struct ofproto *p)
 
     shash_find_and_delete(&all_ofprotos, dpif_name(p->dpif));
 
-    /* Destroy fail-open and in-band early, since they touch the classifier. */
-    fail_open_destroy(p->fail_open);
-    p->fail_open = NULL;
-
-    in_band_destroy(p->in_band);
-    p->in_band = NULL;
-    free(p->extra_in_band_remotes);
+    /* Destroy connmgr early, since it touches the classifier. */
+    connmgr_destroy(p->connmgr);
 
     ofproto_flush_flows(p);
     classifier_destroy(&p->cls);
     hmap_destroy(&p->facets);
 
-    LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &p->all_conns) {
-        ofconn_destroy(ofconn);
-    }
-    hmap_destroy(&p->controllers);
-
     dpif_close(p->dpif);
     netdev_monitor_destroy(p->netdev_monitor);
     HMAP_FOR_EACH_SAFE (ofport, next_ofport, hmap_node, &p->ports) {
@@ -1102,16 +701,6 @@ ofproto_destroy(struct ofproto *p)
     netflow_destroy(p->netflow);
     ofproto_sflow_destroy(p->sflow);
 
-    HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &p->services) {
-        ofservice_destroy(p, ofservice);
-    }
-    hmap_destroy(&p->services);
-
-    for (i = 0; i < p->n_snoops; i++) {
-        pvconn_close(p->snoops[i]);
-    }
-    free(p->snoops);
-
     mac_learning_destroy(p->ml);
 
     free(p->mfr_desc);
@@ -1146,54 +735,9 @@ process_port_change(struct ofproto *ofproto, int error, char *devname)
     }
 }
 
-/* Returns a "preference level" for snooping 'ofconn'.  A higher return value
- * means that 'ofconn' is more interesting for monitoring than a lower return
- * value. */
-static int
-snoop_preference(const struct ofconn *ofconn)
-{
-    switch (ofconn_get_role(ofconn)) {
-    case NX_ROLE_MASTER:
-        return 3;
-    case NX_ROLE_OTHER:
-        return 2;
-    case NX_ROLE_SLAVE:
-        return 1;
-    default:
-        /* Shouldn't happen. */
-        return 0;
-    }
-}
-
-/* One of ofproto's "snoop" pvconns has accepted a new connection on 'vconn'.
- * Connects this vconn to a controller. */
-static void
-add_snooper(struct ofproto *ofproto, struct vconn *vconn)
-{
-    struct ofconn *ofconn, *best;
-
-    /* Pick a controller for monitoring. */
-    best = NULL;
-    LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
-        if (ofconn_get_type(ofconn) == OFCONN_PRIMARY
-            && (!best || snoop_preference(ofconn) > snoop_preference(best))) {
-            best = ofconn;
-        }
-    }
-
-    if (best) {
-        rconn_add_monitor(best->rconn, vconn);
-    } else {
-        VLOG_INFO_RL(&rl, "no controller connection to snoop");
-        vconn_close(vconn);
-    }
-}
-
 int
 ofproto_run1(struct ofproto *p)
 {
-    struct ofconn *ofconn, *next_ofconn;
-    struct ofservice *ofservice;
     struct ofport *ofport;
     char *devname;
     int error;
@@ -1235,56 +779,7 @@ ofproto_run1(struct ofproto *p)
         ofport_run(p, ofport);
     }
 
-    if (p->in_band) {
-        if (time_msec() >= p->next_in_band_update) {
-            update_in_band_remotes(p);
-        }
-        in_band_run(p->in_band);
-    }
-
-    LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &p->all_conns) {
-        ofconn_run(ofconn);
-    }
-
-    /* Fail-open maintenance.  Do this after processing the ofconns since
-     * fail-open checks the status of the controller rconn. */
-    if (p->fail_open) {
-        fail_open_run(p->fail_open);
-    }
-
-    HMAP_FOR_EACH (ofservice, node, &p->services) {
-        struct vconn *vconn;
-        int retval;
-
-        retval = pvconn_accept(ofservice->pvconn, OFP_VERSION, &vconn);
-        if (!retval) {
-            struct rconn *rconn;
-            char *name;
-
-            rconn = rconn_create(ofservice->probe_interval, 0);
-            name = ofconn_make_name(p, vconn_get_name(vconn));
-            rconn_connect_unreliably(rconn, vconn, name);
-            free(name);
-
-            ofconn = ofconn_create(p, rconn, OFCONN_SERVICE);
-            ofconn_set_rate_limit(ofconn, ofservice->rate_limit,
-                                  ofservice->burst_limit);
-        } else if (retval != EAGAIN) {
-            VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
-        }
-    }
-
-    for (i = 0; i < p->n_snoops; i++) {
-        struct vconn *vconn;
-        int retval;
-
-        retval = pvconn_accept(p->snoops[i], OFP_VERSION, &vconn);
-        if (!retval) {
-            add_snooper(p, vconn);
-        } else if (retval != EAGAIN) {
-            VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
-        }
-    }
+    connmgr_run(p->connmgr, handle_openflow);
 
     if (time_msec() >= p->next_expiration) {
         int delay = ofproto_expire(p);
@@ -1333,27 +828,14 @@ ofproto_run2(struct ofproto *p, bool revalidate_all)
 void
 ofproto_wait(struct ofproto *p)
 {
-    struct ofservice *ofservice;
-    struct ofconn *ofconn;
     struct ofport *ofport;
-    size_t i;
 
-    dpif_recv_wait(p->dpif);
-    dpif_port_poll_wait(p->dpif);
-    netdev_monitor_poll_wait(p->netdev_monitor);
     HMAP_FOR_EACH (ofport, hmap_node, &p->ports) {
         ofport_wait(ofport);
     }
-    LIST_FOR_EACH (ofconn, node, &p->all_conns) {
-        ofconn_wait(ofconn);
-    }
-    if (p->in_band) {
-        poll_timer_wait_until(p->next_in_band_update);
-        in_band_wait(p->in_band);
-    }
-    if (p->fail_open) {
-        fail_open_wait(p->fail_open);
-    }
+    dpif_recv_wait(p->dpif);
+    dpif_port_poll_wait(p->dpif);
+    netdev_monitor_poll_wait(p->netdev_monitor);
     if (p->sflow) {
         ofproto_sflow_wait(p->sflow);
     }
@@ -1367,12 +849,7 @@ ofproto_wait(struct ofproto *p)
     } else if (p->next_expiration != LLONG_MAX) {
         poll_timer_wait_until(p->next_expiration);
     }
-    HMAP_FOR_EACH (ofservice, node, &p->services) {
-        pvconn_wait(ofservice->pvconn);
-    }
-    for (i = 0; i < p->n_snoops; i++) {
-        pvconn_wait(p->snoops[i]);
-    }
+    connmgr_wait(p->connmgr);
 }
 
 void
@@ -1390,54 +867,14 @@ ofproto_get_revalidate_set(struct ofproto *ofproto)
 bool
 ofproto_is_alive(const struct ofproto *p)
 {
-    return !hmap_is_empty(&p->controllers);
+    return connmgr_has_controllers(p->connmgr);
 }
 
 void
 ofproto_get_ofproto_controller_info(const struct ofproto *ofproto,
                                     struct shash *info)
 {
-    const struct ofconn *ofconn;
-
-    shash_init(info);
-
-    HMAP_FOR_EACH (ofconn, hmap_node, &ofproto->controllers) {
-        const struct rconn *rconn = ofconn->rconn;
-        time_t now = time_now();
-        time_t last_connection = rconn_get_last_connection(rconn);
-        time_t last_disconnect = rconn_get_last_disconnect(rconn);
-        const int last_error = rconn_get_last_error(rconn);
-        struct ofproto_controller_info *cinfo = xmalloc(sizeof *cinfo);
-
-        shash_add(info, rconn_get_target(rconn), cinfo);
-
-        cinfo->is_connected = rconn_is_connected(rconn);
-        cinfo->role = ofconn_get_role(ofconn);
-
-        cinfo->pairs.n = 0;
-
-        if (last_error) {
-            cinfo->pairs.keys[cinfo->pairs.n] = "last_error";
-            cinfo->pairs.values[cinfo->pairs.n++] =
-                xstrdup(ovs_retval_to_string(last_error));
-        }
-
-        cinfo->pairs.keys[cinfo->pairs.n] = "state";
-        cinfo->pairs.values[cinfo->pairs.n++] =
-            xstrdup(rconn_get_state(rconn));
-
-        if (last_connection != TIME_MIN) {
-            cinfo->pairs.keys[cinfo->pairs.n] = "sec_since_connect";
-            cinfo->pairs.values[cinfo->pairs.n++]
-                = xasprintf("%ld", (long int) (now - last_connection));
-        }
-
-        if (last_disconnect != TIME_MIN) {
-            cinfo->pairs.keys[cinfo->pairs.n] = "sec_since_disconnect";
-            cinfo->pairs.values[cinfo->pairs.n++]
-                = xasprintf("%ld", (long int) (now - last_disconnect));
-        }
-    }
+    connmgr_get_controller_info(ofproto->connmgr, info);
 }
 
 void
@@ -1583,12 +1020,7 @@ ofproto_flush_flows(struct ofproto *ofproto)
     }
 
     dpif_flow_flush(ofproto->dpif);
-    if (ofproto->in_band) {
-        in_band_flushed(ofproto->in_band);
-    }
-    if (ofproto->fail_open) {
-        fail_open_flushed(ofproto->fail_open);
-    }
+    connmgr_flushed(ofproto->connmgr);
 }
 \f
 static void
@@ -1695,25 +1127,7 @@ static void
 send_port_status(struct ofproto *p, const struct ofport *ofport,
                  uint8_t reason)
 {
-    /* XXX Should limit the number of queued port status change messages. */
-    struct ofconn *ofconn;
-    LIST_FOR_EACH (ofconn, node, &p->all_conns) {
-        struct ofp_port_status *ops;
-        struct ofpbuf *b;
-
-        /* Primary controllers, even slaves, should always get port status
-           updates.  Otherwise obey ofconn_receives_async_msgs(). */
-        if (ofconn_get_type(ofconn) != OFCONN_PRIMARY
-            && !ofconn_receives_async_msgs(ofconn)) {
-            continue;
-        }
-
-        ops = make_openflow_xid(sizeof *ops, OFPT_PORT_STATUS, 0, &b);
-        ops->reason = reason;
-        ops->desc = ofport->opp;
-        hton_ofp_phy_port(&ops->desc);
-        queue_tx(b, ofconn, NULL);
-    }
+    connmgr_send_port_status(p->connmgr, &ofport->opp, reason);
 }
 
 static void
@@ -1883,250 +1297,6 @@ init_ports(struct ofproto *p)
     return 0;
 }
 \f
-static struct ofconn *
-ofconn_create(struct ofproto *p, struct rconn *rconn, enum ofconn_type type)
-{
-    struct ofconn *ofconn = xzalloc(sizeof *ofconn);
-    ofconn->ofproto = p;
-    list_push_back(&p->all_conns, &ofconn->node);
-    ofconn->rconn = rconn;
-    ofconn->type = type;
-    ofconn->flow_format = NXFF_OPENFLOW10;
-    ofconn->role = NX_ROLE_OTHER;
-    ofconn->packet_in_counter = rconn_packet_counter_create ();
-    ofconn->pktbuf = NULL;
-    ofconn->miss_send_len = 0;
-    ofconn->reply_counter = rconn_packet_counter_create ();
-    return ofconn;
-}
-
-static void
-ofconn_destroy(struct ofconn *ofconn)
-{
-    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
-
-    if (ofconn_get_type(ofconn) == OFCONN_PRIMARY) {
-        hmap_remove(&ofproto->controllers, &ofconn->hmap_node);
-    }
-
-    list_remove(&ofconn->node);
-    rconn_destroy(ofconn->rconn);
-    rconn_packet_counter_destroy(ofconn->packet_in_counter);
-    rconn_packet_counter_destroy(ofconn->reply_counter);
-    pktbuf_destroy(ofconn->pktbuf);
-    free(ofconn);
-}
-
-static void
-ofconn_run(struct ofconn *ofconn)
-{
-    struct ofproto *p = ofconn_get_ofproto(ofconn);
-    int iteration;
-    size_t i;
-
-    for (i = 0; i < N_SCHEDULERS; i++) {
-        pinsched_run(ofconn->schedulers[i], do_send_packet_in, ofconn);
-    }
-
-    rconn_run(ofconn->rconn);
-
-    if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
-        /* Limit the number of iterations to prevent other tasks from
-         * starving. */
-        for (iteration = 0; iteration < 50; iteration++) {
-            struct ofpbuf *of_msg = rconn_recv(ofconn->rconn);
-            if (!of_msg) {
-                break;
-            }
-            if (p->fail_open) {
-                fail_open_maybe_recover(p->fail_open);
-            }
-            handle_openflow(ofconn, of_msg);
-            ofpbuf_delete(of_msg);
-        }
-    }
-
-    if (!rconn_is_alive(ofconn->rconn)) {
-        ofconn_destroy(ofconn);
-    }
-}
-
-static void
-ofconn_wait(struct ofconn *ofconn)
-{
-    int i;
-
-    for (i = 0; i < N_SCHEDULERS; i++) {
-        pinsched_wait(ofconn->schedulers[i]);
-    }
-    rconn_run_wait(ofconn->rconn);
-    if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
-        rconn_recv_wait(ofconn->rconn);
-    } else {
-        COVERAGE_INC(ofproto_ofconn_stuck);
-    }
-}
-
-/* Returns true if 'ofconn' should receive asynchronous messages. */
-static bool
-ofconn_receives_async_msgs(const struct ofconn *ofconn)
-{
-    if (ofconn_get_type(ofconn) == OFCONN_PRIMARY) {
-        /* Primary controllers always get asynchronous messages unless they
-         * have configured themselves as "slaves".  */
-        return ofconn_get_role(ofconn) != NX_ROLE_SLAVE;
-    } else {
-        /* Service connections don't get asynchronous messages unless they have
-         * explicitly asked for them by setting a nonzero miss send length. */
-        return ofconn->miss_send_len > 0;
-    }
-}
-
-/* Returns a human-readable name for an OpenFlow connection between 'ofproto'
- * and 'target', suitable for use in log messages for identifying the
- * connection.
- *
- * The name is dynamically allocated.  The caller should free it (with free())
- * when it is no longer needed. */
-static char *
-ofconn_make_name(const struct ofproto *ofproto, const char *target)
-{
-    return xasprintf("%s<->%s", dpif_base_name(ofproto->dpif), target);
-}
-
-static void
-ofconn_set_rate_limit(struct ofconn *ofconn, int rate, int burst)
-{
-    int i;
-
-    for (i = 0; i < N_SCHEDULERS; i++) {
-        struct pinsched **s = &ofconn->schedulers[i];
-
-        if (rate > 0) {
-            if (!*s) {
-                *s = pinsched_create(rate, burst);
-            } else {
-                pinsched_set_limits(*s, rate, burst);
-            }
-        } else {
-            pinsched_destroy(*s);
-            *s = NULL;
-        }
-    }
-}
-
-static struct ofproto *
-ofconn_get_ofproto(struct ofconn *ofconn)
-{
-    return ofconn->ofproto;
-}
-
-static enum nx_flow_format
-ofconn_get_flow_format(struct ofconn *ofconn)
-{
-    return ofconn->flow_format;
-}
-
-static void
-ofconn_set_flow_format(struct ofconn *ofconn, enum nx_flow_format flow_format)
-{
-    ofconn->flow_format = flow_format;
-}
-
-static int
-ofconn_get_miss_send_len(const struct ofconn *ofconn)
-{
-    return ofconn->miss_send_len;
-}
-
-static void
-ofconn_set_miss_send_len(struct ofconn *ofconn, int miss_send_len)
-{
-    ofconn->miss_send_len = miss_send_len;
-}
-
-static enum ofconn_type
-ofconn_get_type(const struct ofconn *ofconn)
-{
-    return ofconn->type;
-}
-
-static enum nx_role
-ofconn_get_role(const struct ofconn *ofconn)
-{
-    return ofconn->role;
-}
-
-static void
-ofconn_set_role(struct ofconn *ofconn, enum nx_role role)
-{
-    ofconn->role = role;
-}
-
-static int
-ofconn_pktbuf_retrieve(struct ofconn *ofconn, uint32_t id,
-                       struct ofpbuf **bufferp, uint16_t *in_port)
-{
-    return pktbuf_retrieve(ofconn->pktbuf, id, bufferp, in_port);
-}
-\f
-static void
-ofservice_reconfigure(struct ofservice *ofservice,
-                      const struct ofproto_controller *c)
-{
-    ofservice->probe_interval = c->probe_interval;
-    ofservice->rate_limit = c->rate_limit;
-    ofservice->burst_limit = c->burst_limit;
-}
-
-/* Creates a new ofservice in 'ofproto'.  Returns 0 if successful, otherwise a
- * positive errno value. */
-static int
-ofservice_create(struct ofproto *ofproto, const struct ofproto_controller *c)
-{
-    struct ofservice *ofservice;
-    struct pvconn *pvconn;
-    int error;
-
-    error = pvconn_open(c->target, &pvconn);
-    if (error) {
-        return error;
-    }
-
-    ofservice = xzalloc(sizeof *ofservice);
-    hmap_insert(&ofproto->services, &ofservice->node,
-                hash_string(c->target, 0));
-    ofservice->pvconn = pvconn;
-
-    ofservice_reconfigure(ofservice, c);
-
-    return 0;
-}
-
-static void
-ofservice_destroy(struct ofproto *ofproto, struct ofservice *ofservice)
-{
-    hmap_remove(&ofproto->services, &ofservice->node);
-    pvconn_close(ofservice->pvconn);
-    free(ofservice);
-}
-
-/* Finds and returns the ofservice within 'ofproto' that has the given
- * 'target', or a null pointer if none exists. */
-static struct ofservice *
-ofservice_lookup(struct ofproto *ofproto, const char *target)
-{
-    struct ofservice *ofservice;
-
-    HMAP_FOR_EACH_WITH_HASH (ofservice, node, hash_string(target, 0),
-                             &ofproto->services) {
-        if (!strcmp(pvconn_get_name(ofservice->pvconn), target)) {
-            return ofservice;
-        }
-    }
-    return NULL;
-}
-\f
 /* Returns true if 'rule' should be hidden from the controller.
  *
  * Rules with priority higher than UINT16_MAX are set up by ofproto itself
@@ -2686,22 +1856,6 @@ facet_revalidate(struct ofproto *ofproto, struct facet *facet)
     return true;
 }
 \f
-static void
-queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
-         struct rconn_packet_counter *counter)
-{
-    update_openflow_length(msg);
-    if (rconn_send(ofconn->rconn, msg, counter)) {
-        ofpbuf_delete(msg);
-    }
-}
-
-static void
-ofconn_send_reply(const struct ofconn *ofconn, struct ofpbuf *msg)
-{
-    queue_tx(msg, ofconn, ofconn->reply_counter);
-}
-
 static void
 send_error_oh(const struct ofconn *ofconn, const struct ofp_header *oh,
               int error)
@@ -3301,8 +2455,9 @@ xlate_actions(struct action_xlate_ctx *ctx,
 
     /* Check with in-band control to see if we're allowed to set up this
      * flow. */
-    if (!in_band_rule_check(ctx->ofproto->in_band, &ctx->flow,
-                            ctx->odp_actions->data, ctx->odp_actions->size)) {
+    if (!connmgr_may_set_up_flow(ctx->ofproto->connmgr, &ctx->flow,
+                                 ctx->odp_actions->data,
+                                 ctx->odp_actions->size)) {
         ctx->may_set_up_flow = false;
     }
 
@@ -4387,8 +3542,7 @@ handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
     uint32_t role;
 
     if (ofconn_get_type(ofconn) != OFCONN_PRIMARY) {
-        VLOG_WARN_RL(&rl, "ignoring role request on non-controller "
-                     "connection");
+        VLOG_WARN_RL(&rl, "ignoring role request on service connection");
         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_EPERM);
     }
 
@@ -4401,16 +3555,6 @@ handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
         return ofp_mkerr(OFPET_BAD_REQUEST, -1);
     }
 
-    if (role == NX_ROLE_MASTER) {
-        struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
-        struct ofconn *other;
-
-        HMAP_FOR_EACH (other, hmap_node, &ofproto->controllers) {
-            if (ofconn_get_role(other) == NX_ROLE_MASTER) {
-                ofconn_set_role(other, NX_ROLE_SLAVE);
-            }
-        }
-    }
     ofconn_set_role(ofconn, role);
 
     reply = make_nxmsg_xid(sizeof *reply, NXT_ROLE_REPLY, oh->xid, &buf);
@@ -4601,7 +3745,7 @@ handle_miss_upcall(struct ofproto *p, struct dpif_upcall *upcall)
 
     /* Check with in-band control to see if this packet should be sent
      * to the local port regardless of the flow table. */
-    if (in_band_msg_in_hook(p->in_band, &flow, upcall->packet)) {
+    if (connmgr_msg_in_hook(p->connmgr, &flow, upcall->packet)) {
         ofproto_send_packet(p, ODPP_LOCAL, 0, upcall->packet);
     }
 
@@ -4962,7 +4106,6 @@ static void
 rule_send_removed(struct ofproto *p, struct rule *rule, uint8_t reason)
 {
     struct ofputil_flow_removed fr;
-    struct ofconn *ofconn;
 
     if (!rule->send_flow_removed) {
         return;
@@ -4976,22 +4119,7 @@ rule_send_removed(struct ofproto *p, struct rule *rule, uint8_t reason)
     fr.packet_count = rule->packet_count;
     fr.byte_count = rule->byte_count;
 
-    LIST_FOR_EACH (ofconn, node, &p->all_conns) {
-        struct ofpbuf *msg;
-
-        if (!rconn_is_connected(ofconn->rconn)
-            || !ofconn_receives_async_msgs(ofconn)) {
-            continue;
-        }
-
-        /* This accounts flow expirations as if they were replies to OpenFlow
-         * requests.  That works because preventing OpenFlow requests from
-         * being processed also prevents new flows from being added (and
-         * expiring).  (It also prevents processing OpenFlow requests that
-         * would not add new flows, so it is imperfect.) */
-        msg = ofputil_encode_flow_removed(&fr, ofconn_get_flow_format(ofconn));
-        ofconn_send_reply(ofconn, msg);
-    }
+    connmgr_send_flow_removed(p->connmgr, &fr);
 }
 
 /* Obtains statistics for 'rule' and stores them in '*packets' and '*bytes'.
@@ -5019,64 +4147,6 @@ rule_get_stats(const struct rule *rule, uint64_t *packets, uint64_t *bytes)
     *bytes = b;
 }
 
-/* pinsched callback for sending 'ofp_packet_in' on 'ofconn'. */
-static void
-do_send_packet_in(struct ofpbuf *ofp_packet_in, void *ofconn_)
-{
-    struct ofconn *ofconn = ofconn_;
-
-    rconn_send_with_limit(ofconn->rconn, ofp_packet_in,
-                          ofconn->packet_in_counter, 100);
-}
-
-/* Takes 'upcall', whose packet has the flow specified by 'flow', composes an
- * OpenFlow packet-in message from it, and passes it to 'ofconn''s packet
- * scheduler for sending.
- *
- * If 'clone' is true, the caller retains ownership of 'upcall->packet'.
- * Otherwise, ownership is transferred to this function. */
-static void
-schedule_packet_in(struct ofconn *ofconn, struct dpif_upcall *upcall,
-                   const struct flow *flow, bool clone)
-{
-    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
-    struct ofputil_packet_in pin;
-    struct ofpbuf *msg;
-
-    /* Figure out the easy parts. */
-    pin.packet = upcall->packet;
-    pin.in_port = odp_port_to_ofp_port(flow->in_port);
-    pin.reason = upcall->type == DPIF_UC_MISS ? OFPR_NO_MATCH : OFPR_ACTION;
-
-    /* Get OpenFlow buffer_id. */
-    if (upcall->type == DPIF_UC_ACTION) {
-        pin.buffer_id = UINT32_MAX;
-    } else if (ofproto->fail_open && fail_open_is_active(ofproto->fail_open)) {
-        pin.buffer_id = pktbuf_get_null();
-    } else if (!ofconn->pktbuf) {
-        pin.buffer_id = UINT32_MAX;
-    } else {
-        pin.buffer_id = pktbuf_save(ofconn->pktbuf, upcall->packet,
-                                    flow->in_port);
-    }
-
-    /* Figure out how much of the packet to send. */
-    pin.send_len = upcall->packet->size;
-    if (pin.buffer_id != UINT32_MAX) {
-        pin.send_len = MIN(pin.send_len, ofconn->miss_send_len);
-    }
-    if (upcall->type == DPIF_UC_ACTION) {
-        pin.send_len = MIN(pin.send_len, upcall->userdata);
-    }
-
-    /* Make OFPT_PACKET_IN and hand over to packet scheduler.  It might
-     * immediately call into do_send_packet_in() or it might buffer it for a
-     * while (until a later call to pinsched_run()). */
-    msg = ofputil_encode_packet_in(&pin, clone ? NULL : upcall->packet);
-    pinsched_send(ofconn->schedulers[upcall->type == DPIF_UC_MISS ? 0 : 1],
-                  flow->in_port, msg, do_send_packet_in, ofconn);
-}
-
 /* Given 'upcall', of type DPIF_UC_ACTION or DPIF_UC_MISS, sends an
  * OFPT_PACKET_IN message to each OpenFlow controller as necessary according to
  * their individual configurations.
@@ -5087,22 +4157,15 @@ static void
 send_packet_in(struct ofproto *ofproto, struct dpif_upcall *upcall,
                const struct flow *flow, bool clone)
 {
-    struct ofconn *ofconn, *prev;
+    struct ofputil_packet_in pin;
 
-    prev = NULL;
-    LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
-        if (ofconn_receives_async_msgs(ofconn)) {
-            if (prev) {
-                schedule_packet_in(prev, upcall, flow, true);
-            }
-            prev = ofconn;
-        }
-    }
-    if (prev) {
-        schedule_packet_in(prev, upcall, flow, clone);
-    } else if (!clone) {
-        ofpbuf_delete(upcall->packet);
-    }
+    pin.packet = upcall->packet;
+    pin.in_port = odp_port_to_ofp_port(flow->in_port);
+    pin.reason = upcall->type == DPIF_UC_MISS ? OFPR_NO_MATCH : OFPR_ACTION;
+    pin.buffer_id = 0;          /* not yet known */
+    pin.send_len = upcall->userdata;
+    connmgr_send_packet_in(ofproto->connmgr, upcall, flow,
+                           clone ? NULL : upcall->packet);
 }
 
 static uint64_t