ofproto: Avoid memory leak in classifier on destruction.
[sliver-openvswitch.git] / ofproto / ofproto.c
index 4c57f7e..652ded0 100644 (file)
@@ -27,6 +27,7 @@
 #include "byte-order.h"
 #include "cfm.h"
 #include "classifier.h"
+#include "connmgr.h"
 #include "coverage.h"
 #include "dpif.h"
 #include "dynamic-string.h"
 #include "poll-loop.h"
 #include "rconn.h"
 #include "shash.h"
+#include "sset.h"
 #include "stream-ssl.h"
-#include "svec.h"
 #include "tag.h"
+#include "timer.h"
 #include "timeval.h"
 #include "unaligned.h"
 #include "unixctl.h"
@@ -79,7 +81,6 @@ COVERAGE_DEFINE(ofproto_flows_req);
 COVERAGE_DEFINE(ofproto_flush);
 COVERAGE_DEFINE(ofproto_invalidated);
 COVERAGE_DEFINE(ofproto_no_packet_in);
-COVERAGE_DEFINE(ofproto_ofconn_stuck);
 COVERAGE_DEFINE(ofproto_ofp2odp);
 COVERAGE_DEFINE(ofproto_packet_in);
 COVERAGE_DEFINE(ofproto_packet_out);
@@ -270,96 +271,8 @@ static void facet_update_stats(struct ofproto *, struct facet *,
                                const struct dpif_flow_stats *);
 static void facet_push_stats(struct ofproto *, struct facet *);
 
-/* ofproto supports two kinds of OpenFlow connections:
- *
- *   - "Primary" connections to ordinary OpenFlow controllers.  ofproto
- *     maintains persistent connections to these controllers and by default
- *     sends them asynchronous messages such as packet-ins.
- *
- *   - "Service" connections, e.g. from ovs-ofctl.  When these connections
- *     drop, it is the other side's responsibility to reconnect them if
- *     necessary.  ofproto does not send them asynchronous messages by default.
- *
- * Currently, active (tcp, ssl, unix) connections are always "primary"
- * connections and passive (ptcp, pssl, punix) connections are always "service"
- * connections.  There is no inherent reason for this, but it reflects the
- * common case.
- */
-enum ofconn_type {
-    OFCONN_PRIMARY,             /* An ordinary OpenFlow controller. */
-    OFCONN_SERVICE              /* A service connection, e.g. "ovs-ofctl". */
-};
-
-/* A listener for incoming OpenFlow "service" connections. */
-struct ofservice {
-    struct hmap_node node;      /* In struct ofproto's "services" hmap. */
-    struct pvconn *pvconn;      /* OpenFlow connection listener. */
-
-    /* These are not used by ofservice directly.  They are settings for
-     * accepted "struct ofconn"s from the pvconn. */
-    int probe_interval;         /* Max idle time before probing, in seconds. */
-    int rate_limit;             /* Max packet-in rate in packets per second. */
-    int burst_limit;            /* Limit on accumulating packet credits. */
-};
-
-static struct ofservice *ofservice_lookup(struct ofproto *,
-                                          const char *target);
-static int ofservice_create(struct ofproto *,
-                            const struct ofproto_controller *);
-static void ofservice_reconfigure(struct ofservice *,
-                                  const struct ofproto_controller *);
-static void ofservice_destroy(struct ofproto *, struct ofservice *);
-
-/* An OpenFlow connection. */
-struct ofconn {
-    struct ofproto *ofproto;    /* The ofproto that owns this connection. */
-    struct list node;           /* In struct ofproto's "all_conns" list. */
-    struct rconn *rconn;        /* OpenFlow connection. */
-    enum ofconn_type type;      /* Type. */
-    enum nx_flow_format flow_format; /* Currently selected flow format. */
-
-    /* OFPT_PACKET_IN related data. */
-    struct rconn_packet_counter *packet_in_counter; /* # queued on 'rconn'. */
-#define N_SCHEDULERS 2
-    struct pinsched *schedulers[N_SCHEDULERS];
-    struct pktbuf *pktbuf;         /* OpenFlow packet buffers. */
-    int miss_send_len;             /* Bytes to send of buffered packets. */
-
-    /* Number of OpenFlow messages queued on 'rconn' as replies to OpenFlow
-     * requests, and the maximum number before we stop reading OpenFlow
-     * requests.  */
-#define OFCONN_REPLY_MAX 100
-    struct rconn_packet_counter *reply_counter;
-
-    /* type == OFCONN_PRIMARY only. */
-    enum nx_role role;           /* Role. */
-    struct hmap_node hmap_node;  /* In struct ofproto's "controllers" map. */
-    enum ofproto_band band;      /* In-band or out-of-band? */
-};
-
-
-static struct ofconn *ofconn_create(struct ofproto *, struct rconn *,
-                                    enum ofconn_type);
-static void ofconn_destroy(struct ofconn *);
-static void ofconn_run(struct ofconn *);
-static void ofconn_wait(struct ofconn *);
-
-static bool ofconn_receives_async_msgs(const struct ofconn *);
-static char *ofconn_make_name(const struct ofproto *, const char *target);
-static void ofconn_set_rate_limit(struct ofconn *, int rate, int burst);
-
-static struct ofproto *ofconn_get_ofproto(struct ofconn *);
-static enum nx_flow_format ofconn_get_flow_format(struct ofconn *);
-static void ofconn_set_flow_format(struct ofconn *, enum nx_flow_format);
-static int ofconn_get_miss_send_len(const struct ofconn *);
-static void ofconn_set_miss_send_len(struct ofconn *, int miss_send_len);
-
-static void queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
-                     struct rconn_packet_counter *counter);
-
 static void send_packet_in(struct ofproto *, struct dpif_upcall *,
                            const struct flow *, bool clone);
-static void do_send_packet_in(struct ofpbuf *ofp_packet_in, void *ofconn);
 
 struct ofproto {
     /* Settings. */
@@ -379,20 +292,12 @@ struct ofproto {
     uint32_t max_ports;
 
     /* Configuration. */
-    struct fail_open *fail_open;
     struct netflow *netflow;
     struct ofproto_sflow *sflow;
 
-    /* In-band control. */
-    struct in_band *in_band;
-    long long int next_in_band_update;
-    struct sockaddr_in *extra_in_band_remotes;
-    size_t n_extra_remotes;
-    int in_band_queue;
-
     /* Flow table. */
     struct classifier cls;
-    long long int next_expiration;
+    struct timer next_expiration;
 
     /* Facets. */
     struct hmap facets;
@@ -400,14 +305,7 @@ struct ofproto {
     struct tag_set revalidate_set;
 
     /* OpenFlow connections. */
-    struct hmap controllers;   /* Controller "struct ofconn"s. */
-    struct list all_conns;     /* Contains "struct ofconn"s. */
-    enum ofproto_fail_mode fail_mode;
-
-    /* OpenFlow listeners. */
-    struct hmap services;       /* Contains "struct ofservice"s. */
-    struct pvconn **snoops;
-    size_t n_snoops;
+    struct connmgr *connmgr;
 
     /* Hooks for ovs-vswitchd. */
     const struct ofhooks *ofhooks;
@@ -427,6 +325,7 @@ static const struct ofhooks default_ofhooks;
 static uint64_t pick_datapath_id(const struct ofproto *);
 static uint64_t pick_fallback_dpid(void);
 
+static void ofproto_flush_flows__(struct ofproto *);
 static int ofproto_expire(struct ofproto *);
 static void flow_push_stats(struct ofproto *, const struct rule *,
                             struct flow *, uint64_t packets, uint64_t bytes,
@@ -448,6 +347,7 @@ ofproto_create(const char *datapath, const char *datapath_type,
                const struct ofhooks *ofhooks, void *aux,
                struct ofproto **ofprotop)
 {
+    char local_name[IF_NAMESIZE];
     struct ofproto *p;
     struct dpif *dpif;
     int error;
@@ -475,6 +375,14 @@ ofproto_create(const char *datapath, const char *datapath_type,
     dpif_flow_flush(dpif);
     dpif_recv_purge(dpif);
 
+    error = dpif_port_get_name(dpif, ODPP_LOCAL,
+                               local_name, sizeof local_name);
+    if (error) {
+        VLOG_ERR("%s: cannot get name of datapath local port (%s)",
+                 datapath, strerror(error));
+        return error;
+    }
+
     /* Initialize settings. */
     p = xzalloc(sizeof *p);
     p->fallback_dpid = pick_fallback_dpid();
@@ -493,30 +401,18 @@ ofproto_create(const char *datapath, const char *datapath_type,
     p->max_ports = dpif_get_max_ports(dpif);
 
     /* Initialize submodules. */
-    p->fail_open = NULL;
     p->netflow = NULL;
     p->sflow = NULL;
 
-    /* Initialize in-band control. */
-    p->in_band = NULL;
-    p->in_band_queue = -1;
-
     /* Initialize flow table. */
     classifier_init(&p->cls);
-    p->next_expiration = time_msec() + 1000;
+    timer_set_duration(&p->next_expiration, 1000);
 
     /* Initialize facet table. */
     hmap_init(&p->facets);
     p->need_revalidate = false;
     tag_set_init(&p->revalidate_set);
 
-    /* Initialize OpenFlow connections. */
-    list_init(&p->all_conns);
-    hmap_init(&p->controllers);
-    hmap_init(&p->services);
-    p->snoops = NULL;
-    p->n_snoops = 0;
-
     /* Initialize hooks. */
     if (ofhooks) {
         p->ofhooks = ofhooks;
@@ -534,6 +430,9 @@ ofproto_create(const char *datapath, const char *datapath_type,
 
     shash_add_once(&all_ofprotos, dpif_name(p->dpif), p);
 
+    /* Initialize OpenFlow connections. */
+    p->connmgr = connmgr_create(p, datapath, local_name);
+
     *ofprotop = p;
     return 0;
 }
@@ -552,212 +451,18 @@ ofproto_set_datapath_id(struct ofproto *p, uint64_t datapath_id)
     }
 }
 
-/* Creates a new controller in 'ofproto'.  Some of the settings are initially
- * drawn from 'c', but update_controller() needs to be called later to finish
- * the new ofconn's configuration. */
-static void
-add_controller(struct ofproto *ofproto, const struct ofproto_controller *c)
-{
-    char *name = ofconn_make_name(ofproto, c->target);
-    struct ofconn *ofconn;
-
-    ofconn = ofconn_create(ofproto, rconn_create(5, 8), OFCONN_PRIMARY);
-    ofconn->pktbuf = pktbuf_create();
-    ofconn->miss_send_len = OFP_DEFAULT_MISS_SEND_LEN;
-    rconn_connect(ofconn->rconn, c->target, name);
-    hmap_insert(&ofproto->controllers, &ofconn->hmap_node,
-                hash_string(c->target, 0));
-
-    free(name);
-}
-
-/* Reconfigures 'ofconn' to match 'c'.  This function cannot update an ofconn's
- * target (this is done by creating new ofconns and deleting old ones), but it
- * can update the rest of an ofconn's settings. */
-static void
-update_controller(struct ofconn *ofconn, const struct ofproto_controller *c)
-{
-    int probe_interval;
-
-    ofconn->band = c->band;
-
-    rconn_set_max_backoff(ofconn->rconn, c->max_backoff);
-
-    probe_interval = c->probe_interval ? MAX(c->probe_interval, 5) : 0;
-    rconn_set_probe_interval(ofconn->rconn, probe_interval);
-
-    ofconn_set_rate_limit(ofconn, c->rate_limit, c->burst_limit);
-}
-
-static const char *
-ofconn_get_target(const struct ofconn *ofconn)
-{
-    return rconn_get_target(ofconn->rconn);
-}
-
-static struct ofconn *
-find_controller_by_target(struct ofproto *ofproto, const char *target)
-{
-    struct ofconn *ofconn;
-
-    HMAP_FOR_EACH_WITH_HASH (ofconn, hmap_node,
-                             hash_string(target, 0), &ofproto->controllers) {
-        if (!strcmp(ofconn_get_target(ofconn), target)) {
-            return ofconn;
-        }
-    }
-    return NULL;
-}
-
-static void
-update_in_band_remotes(struct ofproto *ofproto)
-{
-    const struct ofconn *ofconn;
-    struct sockaddr_in *addrs;
-    size_t max_addrs, n_addrs;
-    size_t i;
-
-    /* Allocate enough memory for as many remotes as we could possibly have. */
-    max_addrs = ofproto->n_extra_remotes + hmap_count(&ofproto->controllers);
-    addrs = xmalloc(max_addrs * sizeof *addrs);
-    n_addrs = 0;
-
-    /* Add all the remotes. */
-    HMAP_FOR_EACH (ofconn, hmap_node, &ofproto->controllers) {
-        struct sockaddr_in *sin = &addrs[n_addrs];
-
-        if (ofconn->band == OFPROTO_OUT_OF_BAND) {
-            continue;
-        }
-
-        sin->sin_addr.s_addr = rconn_get_remote_ip(ofconn->rconn);
-        if (sin->sin_addr.s_addr) {
-            sin->sin_port = rconn_get_remote_port(ofconn->rconn);
-            n_addrs++;
-        }
-    }
-    for (i = 0; i < ofproto->n_extra_remotes; i++) {
-        addrs[n_addrs++] = ofproto->extra_in_band_remotes[i];
-    }
-
-    /* Create or update or destroy in-band. */
-    if (n_addrs) {
-        if (!ofproto->in_band) {
-            in_band_create(ofproto, ofproto->dpif, &ofproto->in_band);
-        }
-        if (ofproto->in_band) {
-            in_band_set_remotes(ofproto->in_band, addrs, n_addrs);
-        }
-        in_band_set_queue(ofproto->in_band, ofproto->in_band_queue);
-        ofproto->next_in_band_update = time_msec() + 1000;
-    } else {
-        in_band_destroy(ofproto->in_band);
-        ofproto->in_band = NULL;
-    }
-
-    /* Clean up. */
-    free(addrs);
-}
-
-static void
-update_fail_open(struct ofproto *p)
-{
-    struct ofconn *ofconn;
-
-    if (!hmap_is_empty(&p->controllers)
-            && p->fail_mode == OFPROTO_FAIL_STANDALONE) {
-        struct rconn **rconns;
-        size_t n;
-
-        if (!p->fail_open) {
-            p->fail_open = fail_open_create(p);
-        }
-
-        n = 0;
-        rconns = xmalloc(hmap_count(&p->controllers) * sizeof *rconns);
-        HMAP_FOR_EACH (ofconn, hmap_node, &p->controllers) {
-            rconns[n++] = ofconn->rconn;
-        }
-
-        fail_open_set_controllers(p->fail_open, rconns, n);
-        /* p->fail_open takes ownership of 'rconns'. */
-    } else {
-        fail_open_destroy(p->fail_open);
-        p->fail_open = NULL;
-    }
-}
-
 void
 ofproto_set_controllers(struct ofproto *p,
                         const struct ofproto_controller *controllers,
                         size_t n_controllers)
 {
-    struct shash new_controllers;
-    struct ofconn *ofconn, *next_ofconn;
-    struct ofservice *ofservice, *next_ofservice;
-    size_t i;
-
-    /* Create newly configured controllers and services.
-     * Create a name to ofproto_controller mapping in 'new_controllers'. */
-    shash_init(&new_controllers);
-    for (i = 0; i < n_controllers; i++) {
-        const struct ofproto_controller *c = &controllers[i];
-
-        if (!vconn_verify_name(c->target)) {
-            if (!find_controller_by_target(p, c->target)) {
-                add_controller(p, c);
-            }
-        } else if (!pvconn_verify_name(c->target)) {
-            if (!ofservice_lookup(p, c->target) && ofservice_create(p, c)) {
-                continue;
-            }
-        } else {
-            VLOG_WARN_RL(&rl, "%s: unsupported controller \"%s\"",
-                         dpif_name(p->dpif), c->target);
-            continue;
-        }
-
-        shash_add_once(&new_controllers, c->target, &controllers[i]);
-    }
-
-    /* Delete controllers that are no longer configured.
-     * Update configuration of all now-existing controllers. */
-    HMAP_FOR_EACH_SAFE (ofconn, next_ofconn, hmap_node, &p->controllers) {
-        struct ofproto_controller *c;
-
-        c = shash_find_data(&new_controllers, ofconn_get_target(ofconn));
-        if (!c) {
-            ofconn_destroy(ofconn);
-        } else {
-            update_controller(ofconn, c);
-        }
-    }
-
-    /* Delete services that are no longer configured.
-     * Update configuration of all now-existing services. */
-    HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &p->services) {
-        struct ofproto_controller *c;
-
-        c = shash_find_data(&new_controllers,
-                            pvconn_get_name(ofservice->pvconn));
-        if (!c) {
-            ofservice_destroy(p, ofservice);
-        } else {
-            ofservice_reconfigure(ofservice, c);
-        }
-    }
-
-    shash_destroy(&new_controllers);
-
-    update_in_band_remotes(p);
-    update_fail_open(p);
+    connmgr_set_controllers(p->connmgr, controllers, n_controllers);
 }
 
 void
 ofproto_set_fail_mode(struct ofproto *p, enum ofproto_fail_mode fail_mode)
 {
-    p->fail_mode = fail_mode;
-    update_fail_open(p);
+    connmgr_set_fail_mode(p->connmgr, fail_mode);
 }
 
 /* Drops the connections between 'ofproto' and all of its controllers, forcing
@@ -765,34 +470,7 @@ ofproto_set_fail_mode(struct ofproto *p, enum ofproto_fail_mode fail_mode)
 void
 ofproto_reconnect_controllers(struct ofproto *ofproto)
 {
-    struct ofconn *ofconn;
-
-    LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
-        rconn_reconnect(ofconn->rconn);
-    }
-}
-
-static bool
-any_extras_changed(const struct ofproto *ofproto,
-                   const struct sockaddr_in *extras, size_t n)
-{
-    size_t i;
-
-    if (n != ofproto->n_extra_remotes) {
-        return true;
-    }
-
-    for (i = 0; i < n; i++) {
-        const struct sockaddr_in *old = &ofproto->extra_in_band_remotes[i];
-        const struct sockaddr_in *new = &extras[i];
-
-        if (old->sin_addr.s_addr != new->sin_addr.s_addr ||
-            old->sin_port != new->sin_port) {
-            return true;
-        }
-    }
-
-    return false;
+    connmgr_reconnect(ofproto->connmgr);
 }
 
 /* Sets the 'n' TCP port addresses in 'extras' as ones to which 'ofproto''s
@@ -802,15 +480,7 @@ void
 ofproto_set_extra_in_band_remotes(struct ofproto *ofproto,
                                   const struct sockaddr_in *extras, size_t n)
 {
-    if (!any_extras_changed(ofproto, extras, n)) {
-        return;
-    }
-
-    free(ofproto->extra_in_band_remotes);
-    ofproto->n_extra_remotes = n;
-    ofproto->extra_in_band_remotes = xmemdup(extras, n * sizeof *extras);
-
-    update_in_band_remotes(ofproto);
+    connmgr_set_extra_in_band_remotes(ofproto->connmgr, extras, n);
 }
 
 /* Sets the OpenFlow queue used by flows set up by in-band control on
@@ -819,10 +489,7 @@ ofproto_set_extra_in_band_remotes(struct ofproto *ofproto,
 void
 ofproto_set_in_band_queue(struct ofproto *ofproto, int queue_id)
 {
-    if (queue_id != ofproto->in_band_queue) {
-        ofproto->in_band_queue = queue_id;
-        update_in_band_remotes(ofproto);
-    }
+    connmgr_set_in_band_queue(ofproto->connmgr, queue_id);
 }
 
 void
@@ -876,55 +543,17 @@ ofproto_set_desc(struct ofproto *p,
     }
 }
 
-static int
-set_pvconns(struct pvconn ***pvconnsp, size_t *n_pvconnsp,
-            const struct svec *svec)
-{
-    struct pvconn **pvconns = *pvconnsp;
-    size_t n_pvconns = *n_pvconnsp;
-    int retval = 0;
-    size_t i;
-
-    for (i = 0; i < n_pvconns; i++) {
-        pvconn_close(pvconns[i]);
-    }
-    free(pvconns);
-
-    pvconns = xmalloc(svec->n * sizeof *pvconns);
-    n_pvconns = 0;
-    for (i = 0; i < svec->n; i++) {
-        const char *name = svec->names[i];
-        struct pvconn *pvconn;
-        int error;
-
-        error = pvconn_open(name, &pvconn);
-        if (!error) {
-            pvconns[n_pvconns++] = pvconn;
-        } else {
-            VLOG_ERR("failed to listen on %s: %s", name, strerror(error));
-            if (!retval) {
-                retval = error;
-            }
-        }
-    }
-
-    *pvconnsp = pvconns;
-    *n_pvconnsp = n_pvconns;
-
-    return retval;
-}
-
 int
-ofproto_set_snoops(struct ofproto *ofproto, const struct svec *snoops)
+ofproto_set_snoops(struct ofproto *ofproto, const struct sset *snoops)
 {
-    return set_pvconns(&ofproto->snoops, &ofproto->n_snoops, snoops);
+    return connmgr_set_snoops(ofproto->connmgr, snoops);
 }
 
 int
 ofproto_set_netflow(struct ofproto *ofproto,
                     const struct netflow_options *nf_options)
 {
-    if (nf_options && nf_options->collectors.n) {
+    if (nf_options && !sset_is_empty(&nf_options->collectors)) {
         if (!ofproto->netflow) {
             ofproto->netflow = netflow_create();
         }
@@ -1030,32 +659,31 @@ ofproto_get_datapath_id(const struct ofproto *ofproto)
 bool
 ofproto_has_primary_controller(const struct ofproto *ofproto)
 {
-    return !hmap_is_empty(&ofproto->controllers);
+    return connmgr_has_controllers(ofproto->connmgr);
 }
 
 enum ofproto_fail_mode
 ofproto_get_fail_mode(const struct ofproto *p)
 {
-    return p->fail_mode;
+    return connmgr_get_fail_mode(p->connmgr);
 }
 
-void
-ofproto_get_snoops(const struct ofproto *ofproto, struct svec *snoops)
+bool
+ofproto_has_snoops(const struct ofproto *ofproto)
 {
-    size_t i;
+    return connmgr_has_snoops(ofproto->connmgr);
+}
 
-    for (i = 0; i < ofproto->n_snoops; i++) {
-        svec_add(snoops, pvconn_get_name(ofproto->snoops[i]));
-    }
+void
+ofproto_get_snoops(const struct ofproto *ofproto, struct sset *snoops)
+{
+    connmgr_get_snoops(ofproto->connmgr, snoops);
 }
 
 void
 ofproto_destroy(struct ofproto *p)
 {
-    struct ofservice *ofservice, *next_ofservice;
-    struct ofconn *ofconn, *next_ofconn;
     struct ofport *ofport, *next_ofport;
-    size_t i;
 
     if (!p) {
         return;
@@ -1063,23 +691,11 @@ ofproto_destroy(struct ofproto *p)
 
     shash_find_and_delete(&all_ofprotos, dpif_name(p->dpif));
 
-    /* Destroy fail-open and in-band early, since they touch the classifier. */
-    fail_open_destroy(p->fail_open);
-    p->fail_open = NULL;
-
-    in_band_destroy(p->in_band);
-    p->in_band = NULL;
-    free(p->extra_in_band_remotes);
-
-    ofproto_flush_flows(p);
+    ofproto_flush_flows__(p);
+    connmgr_destroy(p->connmgr);
     classifier_destroy(&p->cls);
     hmap_destroy(&p->facets);
 
-    LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &p->all_conns) {
-        ofconn_destroy(ofconn);
-    }
-    hmap_destroy(&p->controllers);
-
     dpif_close(p->dpif);
     netdev_monitor_destroy(p->netdev_monitor);
     HMAP_FOR_EACH_SAFE (ofport, next_ofport, hmap_node, &p->ports) {
@@ -1091,16 +707,6 @@ ofproto_destroy(struct ofproto *p)
     netflow_destroy(p->netflow);
     ofproto_sflow_destroy(p->sflow);
 
-    HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &p->services) {
-        ofservice_destroy(p, ofservice);
-    }
-    hmap_destroy(&p->services);
-
-    for (i = 0; i < p->n_snoops; i++) {
-        pvconn_close(p->snoops[i]);
-    }
-    free(p->snoops);
-
     mac_learning_destroy(p->ml);
 
     free(p->mfr_desc);
@@ -1135,54 +741,9 @@ process_port_change(struct ofproto *ofproto, int error, char *devname)
     }
 }
 
-/* Returns a "preference level" for snooping 'ofconn'.  A higher return value
- * means that 'ofconn' is more interesting for monitoring than a lower return
- * value. */
-static int
-snoop_preference(const struct ofconn *ofconn)
-{
-    switch (ofconn->role) {
-    case NX_ROLE_MASTER:
-        return 3;
-    case NX_ROLE_OTHER:
-        return 2;
-    case NX_ROLE_SLAVE:
-        return 1;
-    default:
-        /* Shouldn't happen. */
-        return 0;
-    }
-}
-
-/* One of ofproto's "snoop" pvconns has accepted a new connection on 'vconn'.
- * Connects this vconn to a controller. */
-static void
-add_snooper(struct ofproto *ofproto, struct vconn *vconn)
-{
-    struct ofconn *ofconn, *best;
-
-    /* Pick a controller for monitoring. */
-    best = NULL;
-    LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
-        if (ofconn->type == OFCONN_PRIMARY
-            && (!best || snoop_preference(ofconn) > snoop_preference(best))) {
-            best = ofconn;
-        }
-    }
-
-    if (best) {
-        rconn_add_monitor(best->rconn, vconn);
-    } else {
-        VLOG_INFO_RL(&rl, "no controller connection to snoop");
-        vconn_close(vconn);
-    }
-}
-
 int
 ofproto_run1(struct ofproto *p)
 {
-    struct ofconn *ofconn, *next_ofconn;
-    struct ofservice *ofservice;
     struct ofport *ofport;
     char *devname;
     int error;
@@ -1224,60 +785,11 @@ ofproto_run1(struct ofproto *p)
         ofport_run(p, ofport);
     }
 
-    if (p->in_band) {
-        if (time_msec() >= p->next_in_band_update) {
-            update_in_band_remotes(p);
-        }
-        in_band_run(p->in_band);
-    }
-
-    LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &p->all_conns) {
-        ofconn_run(ofconn);
-    }
-
-    /* Fail-open maintenance.  Do this after processing the ofconns since
-     * fail-open checks the status of the controller rconn. */
-    if (p->fail_open) {
-        fail_open_run(p->fail_open);
-    }
-
-    HMAP_FOR_EACH (ofservice, node, &p->services) {
-        struct vconn *vconn;
-        int retval;
-
-        retval = pvconn_accept(ofservice->pvconn, OFP_VERSION, &vconn);
-        if (!retval) {
-            struct rconn *rconn;
-            char *name;
-
-            rconn = rconn_create(ofservice->probe_interval, 0);
-            name = ofconn_make_name(p, vconn_get_name(vconn));
-            rconn_connect_unreliably(rconn, vconn, name);
-            free(name);
-
-            ofconn = ofconn_create(p, rconn, OFCONN_SERVICE);
-            ofconn_set_rate_limit(ofconn, ofservice->rate_limit,
-                                  ofservice->burst_limit);
-        } else if (retval != EAGAIN) {
-            VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
-        }
-    }
-
-    for (i = 0; i < p->n_snoops; i++) {
-        struct vconn *vconn;
-        int retval;
-
-        retval = pvconn_accept(p->snoops[i], OFP_VERSION, &vconn);
-        if (!retval) {
-            add_snooper(p, vconn);
-        } else if (retval != EAGAIN) {
-            VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
-        }
-    }
+    connmgr_run(p->connmgr, handle_openflow);
 
-    if (time_msec() >= p->next_expiration) {
+    if (timer_expired(&p->next_expiration)) {
         int delay = ofproto_expire(p);
-        p->next_expiration = time_msec() + delay;
+        timer_set_duration(&p->next_expiration, delay);
         COVERAGE_INC(ofproto_expiration);
     }
 
@@ -1322,27 +834,14 @@ ofproto_run2(struct ofproto *p, bool revalidate_all)
 void
 ofproto_wait(struct ofproto *p)
 {
-    struct ofservice *ofservice;
-    struct ofconn *ofconn;
     struct ofport *ofport;
-    size_t i;
 
-    dpif_recv_wait(p->dpif);
-    dpif_port_poll_wait(p->dpif);
-    netdev_monitor_poll_wait(p->netdev_monitor);
     HMAP_FOR_EACH (ofport, hmap_node, &p->ports) {
         ofport_wait(ofport);
     }
-    LIST_FOR_EACH (ofconn, node, &p->all_conns) {
-        ofconn_wait(ofconn);
-    }
-    if (p->in_band) {
-        poll_timer_wait_until(p->next_in_band_update);
-        in_band_wait(p->in_band);
-    }
-    if (p->fail_open) {
-        fail_open_wait(p->fail_open);
-    }
+    dpif_recv_wait(p->dpif);
+    dpif_port_poll_wait(p->dpif);
+    netdev_monitor_poll_wait(p->netdev_monitor);
     if (p->sflow) {
         ofproto_sflow_wait(p->sflow);
     }
@@ -1353,15 +852,10 @@ ofproto_wait(struct ofproto *p)
         /* Shouldn't happen, but if it does just go around again. */
         VLOG_DBG_RL(&rl, "need revalidate in ofproto_wait_cb()");
         poll_immediate_wake();
-    } else if (p->next_expiration != LLONG_MAX) {
-        poll_timer_wait_until(p->next_expiration);
-    }
-    HMAP_FOR_EACH (ofservice, node, &p->services) {
-        pvconn_wait(ofservice->pvconn);
-    }
-    for (i = 0; i < p->n_snoops; i++) {
-        pvconn_wait(p->snoops[i]);
+    } else {
+        timer_wait(&p->next_expiration);
     }
+    connmgr_wait(p->connmgr);
 }
 
 void
@@ -1379,54 +873,14 @@ ofproto_get_revalidate_set(struct ofproto *ofproto)
 bool
 ofproto_is_alive(const struct ofproto *p)
 {
-    return !hmap_is_empty(&p->controllers);
+    return connmgr_has_controllers(p->connmgr);
 }
 
 void
 ofproto_get_ofproto_controller_info(const struct ofproto *ofproto,
                                     struct shash *info)
 {
-    const struct ofconn *ofconn;
-
-    shash_init(info);
-
-    HMAP_FOR_EACH (ofconn, hmap_node, &ofproto->controllers) {
-        const struct rconn *rconn = ofconn->rconn;
-        time_t now = time_now();
-        time_t last_connection = rconn_get_last_connection(rconn);
-        time_t last_disconnect = rconn_get_last_disconnect(rconn);
-        const int last_error = rconn_get_last_error(rconn);
-        struct ofproto_controller_info *cinfo = xmalloc(sizeof *cinfo);
-
-        shash_add(info, rconn_get_target(rconn), cinfo);
-
-        cinfo->is_connected = rconn_is_connected(rconn);
-        cinfo->role = ofconn->role;
-
-        cinfo->pairs.n = 0;
-
-        if (last_error) {
-            cinfo->pairs.keys[cinfo->pairs.n] = "last_error";
-            cinfo->pairs.values[cinfo->pairs.n++] =
-                xstrdup(ovs_retval_to_string(last_error));
-        }
-
-        cinfo->pairs.keys[cinfo->pairs.n] = "state";
-        cinfo->pairs.values[cinfo->pairs.n++] =
-            xstrdup(rconn_get_state(rconn));
-
-        if (last_connection != TIME_MIN) {
-            cinfo->pairs.keys[cinfo->pairs.n] = "sec_since_connect";
-            cinfo->pairs.values[cinfo->pairs.n++]
-                = xasprintf("%ld", (long int) (now - last_connection));
-        }
-
-        if (last_disconnect != TIME_MIN) {
-            cinfo->pairs.keys[cinfo->pairs.n] = "sec_since_disconnect";
-            cinfo->pairs.values[cinfo->pairs.n++]
-                = xasprintf("%ld", (long int) (now - last_disconnect));
-        }
-    }
+    connmgr_get_controller_info(ofproto->connmgr, info);
 }
 
 void
@@ -1546,8 +1000,8 @@ ofproto_delete_flow(struct ofproto *ofproto, const struct cls_rule *target)
     }
 }
 
-void
-ofproto_flush_flows(struct ofproto *ofproto)
+static void
+ofproto_flush_flows__(struct ofproto *ofproto)
 {
     struct facet *facet, *next_facet;
     struct rule *rule, *next_rule;
@@ -1572,37 +1026,38 @@ ofproto_flush_flows(struct ofproto *ofproto)
     }
 
     dpif_flow_flush(ofproto->dpif);
-    if (ofproto->in_band) {
-        in_band_flushed(ofproto->in_band);
-    }
-    if (ofproto->fail_open) {
-        fail_open_flushed(ofproto->fail_open);
-    }
+}
+
+void
+ofproto_flush_flows(struct ofproto *ofproto)
+{
+    ofproto_flush_flows__(ofproto);
+    connmgr_flushed(ofproto->connmgr);
 }
 \f
 static void
 reinit_ports(struct ofproto *p)
 {
     struct dpif_port_dump dump;
-    struct shash_node *node;
-    struct shash devnames;
+    struct sset devnames;
     struct ofport *ofport;
     struct dpif_port dpif_port;
+    const char *devname;
 
     COVERAGE_INC(ofproto_reinit_ports);
 
-    shash_init(&devnames);
+    sset_init(&devnames);
     HMAP_FOR_EACH (ofport, hmap_node, &p->ports) {
-        shash_add_once (&devnames, ofport->opp.name, NULL);
+        sset_add(&devnames, ofport->opp.name);
     }
     DPIF_PORT_FOR_EACH (&dpif_port, &dump, p->dpif) {
-        shash_add_once (&devnames, dpif_port.name, NULL);
+        sset_add(&devnames, dpif_port.name);
     }
 
-    SHASH_FOR_EACH (node, &devnames) {
-        update_port(p, node->name);
+    SSET_FOR_EACH (devname, &devnames) {
+        update_port(p, devname);
     }
-    shash_destroy(&devnames);
+    sset_destroy(&devnames);
 }
 
 static struct ofport *
@@ -1680,31 +1135,6 @@ ofport_equal(const struct ofport *a_, const struct ofport *b_)
             && a->peer == b->peer);
 }
 
-static void
-send_port_status(struct ofproto *p, const struct ofport *ofport,
-                 uint8_t reason)
-{
-    /* XXX Should limit the number of queued port status change messages. */
-    struct ofconn *ofconn;
-    LIST_FOR_EACH (ofconn, node, &p->all_conns) {
-        struct ofp_port_status *ops;
-        struct ofpbuf *b;
-
-        /* Primary controllers, even slaves, should always get port status
-           updates.  Otherwise obey ofconn_receives_async_msgs(). */
-        if (ofconn->type != OFCONN_PRIMARY
-            && !ofconn_receives_async_msgs(ofconn)) {
-            continue;
-        }
-
-        ops = make_openflow_xid(sizeof *ops, OFPT_PORT_STATUS, 0, &b);
-        ops->reason = reason;
-        ops->desc = ofport->opp;
-        hton_ofp_phy_port(&ops->desc);
-        queue_tx(b, ofconn, NULL);
-    }
-}
-
 static void
 ofport_install(struct ofproto *p, struct ofport *ofport)
 {
@@ -1844,10 +1274,11 @@ update_port(struct ofproto *p, const char *devname)
     if (new_ofport) {
         ofport_install(p, new_ofport);
     }
-    send_port_status(p, new_ofport ? new_ofport : old_ofport,
-                     (!old_ofport ? OFPPR_ADD
-                      : !new_ofport ? OFPPR_DELETE
-                      : OFPPR_MODIFY));
+    connmgr_send_port_status(p->connmgr,
+                             new_ofport ? &new_ofport->opp : &old_ofport->opp,
+                             (!old_ofport ? OFPPR_ADD
+                              : !new_ofport ? OFPPR_DELETE
+                              : OFPPR_MODIFY));
     ofport_free(old_ofport);
 
 exit:
@@ -1872,225 +1303,6 @@ init_ports(struct ofproto *p)
     return 0;
 }
 \f
-static struct ofconn *
-ofconn_create(struct ofproto *p, struct rconn *rconn, enum ofconn_type type)
-{
-    struct ofconn *ofconn = xzalloc(sizeof *ofconn);
-    ofconn->ofproto = p;
-    list_push_back(&p->all_conns, &ofconn->node);
-    ofconn->rconn = rconn;
-    ofconn->type = type;
-    ofconn->flow_format = NXFF_OPENFLOW10;
-    ofconn->role = NX_ROLE_OTHER;
-    ofconn->packet_in_counter = rconn_packet_counter_create ();
-    ofconn->pktbuf = NULL;
-    ofconn->miss_send_len = 0;
-    ofconn->reply_counter = rconn_packet_counter_create ();
-    return ofconn;
-}
-
-static void
-ofconn_destroy(struct ofconn *ofconn)
-{
-    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
-
-    if (ofconn->type == OFCONN_PRIMARY) {
-        hmap_remove(&ofproto->controllers, &ofconn->hmap_node);
-    }
-
-    list_remove(&ofconn->node);
-    rconn_destroy(ofconn->rconn);
-    rconn_packet_counter_destroy(ofconn->packet_in_counter);
-    rconn_packet_counter_destroy(ofconn->reply_counter);
-    pktbuf_destroy(ofconn->pktbuf);
-    free(ofconn);
-}
-
-static void
-ofconn_run(struct ofconn *ofconn)
-{
-    struct ofproto *p = ofconn_get_ofproto(ofconn);
-    int iteration;
-    size_t i;
-
-    for (i = 0; i < N_SCHEDULERS; i++) {
-        pinsched_run(ofconn->schedulers[i], do_send_packet_in, ofconn);
-    }
-
-    rconn_run(ofconn->rconn);
-
-    if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
-        /* Limit the number of iterations to prevent other tasks from
-         * starving. */
-        for (iteration = 0; iteration < 50; iteration++) {
-            struct ofpbuf *of_msg = rconn_recv(ofconn->rconn);
-            if (!of_msg) {
-                break;
-            }
-            if (p->fail_open) {
-                fail_open_maybe_recover(p->fail_open);
-            }
-            handle_openflow(ofconn, of_msg);
-            ofpbuf_delete(of_msg);
-        }
-    }
-
-    if (!rconn_is_alive(ofconn->rconn)) {
-        ofconn_destroy(ofconn);
-    }
-}
-
-static void
-ofconn_wait(struct ofconn *ofconn)
-{
-    int i;
-
-    for (i = 0; i < N_SCHEDULERS; i++) {
-        pinsched_wait(ofconn->schedulers[i]);
-    }
-    rconn_run_wait(ofconn->rconn);
-    if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
-        rconn_recv_wait(ofconn->rconn);
-    } else {
-        COVERAGE_INC(ofproto_ofconn_stuck);
-    }
-}
-
-/* Returns true if 'ofconn' should receive asynchronous messages. */
-static bool
-ofconn_receives_async_msgs(const struct ofconn *ofconn)
-{
-    if (ofconn->type == OFCONN_PRIMARY) {
-        /* Primary controllers always get asynchronous messages unless they
-         * have configured themselves as "slaves".  */
-        return ofconn->role != NX_ROLE_SLAVE;
-    } else {
-        /* Service connections don't get asynchronous messages unless they have
-         * explicitly asked for them by setting a nonzero miss send length. */
-        return ofconn->miss_send_len > 0;
-    }
-}
-
-/* Returns a human-readable name for an OpenFlow connection between 'ofproto'
- * and 'target', suitable for use in log messages for identifying the
- * connection.
- *
- * The name is dynamically allocated.  The caller should free it (with free())
- * when it is no longer needed. */
-static char *
-ofconn_make_name(const struct ofproto *ofproto, const char *target)
-{
-    return xasprintf("%s<->%s", dpif_base_name(ofproto->dpif), target);
-}
-
-static void
-ofconn_set_rate_limit(struct ofconn *ofconn, int rate, int burst)
-{
-    int i;
-
-    for (i = 0; i < N_SCHEDULERS; i++) {
-        struct pinsched **s = &ofconn->schedulers[i];
-
-        if (rate > 0) {
-            if (!*s) {
-                *s = pinsched_create(rate, burst);
-            } else {
-                pinsched_set_limits(*s, rate, burst);
-            }
-        } else {
-            pinsched_destroy(*s);
-            *s = NULL;
-        }
-    }
-}
-
-static struct ofproto *
-ofconn_get_ofproto(struct ofconn *ofconn)
-{
-    return ofconn->ofproto;
-}
-
-static enum nx_flow_format
-ofconn_get_flow_format(struct ofconn *ofconn)
-{
-    return ofconn->flow_format;
-}
-
-static void
-ofconn_set_flow_format(struct ofconn *ofconn, enum nx_flow_format flow_format)
-{
-    ofconn->flow_format = flow_format;
-}
-
-static int
-ofconn_get_miss_send_len(const struct ofconn *ofconn)
-{
-    return ofconn->miss_send_len;
-}
-
-static void
-ofconn_set_miss_send_len(struct ofconn *ofconn, int miss_send_len)
-{
-    ofconn->miss_send_len = miss_send_len;
-}
-\f
-static void
-ofservice_reconfigure(struct ofservice *ofservice,
-                      const struct ofproto_controller *c)
-{
-    ofservice->probe_interval = c->probe_interval;
-    ofservice->rate_limit = c->rate_limit;
-    ofservice->burst_limit = c->burst_limit;
-}
-
-/* Creates a new ofservice in 'ofproto'.  Returns 0 if successful, otherwise a
- * positive errno value. */
-static int
-ofservice_create(struct ofproto *ofproto, const struct ofproto_controller *c)
-{
-    struct ofservice *ofservice;
-    struct pvconn *pvconn;
-    int error;
-
-    error = pvconn_open(c->target, &pvconn);
-    if (error) {
-        return error;
-    }
-
-    ofservice = xzalloc(sizeof *ofservice);
-    hmap_insert(&ofproto->services, &ofservice->node,
-                hash_string(c->target, 0));
-    ofservice->pvconn = pvconn;
-
-    ofservice_reconfigure(ofservice, c);
-
-    return 0;
-}
-
-static void
-ofservice_destroy(struct ofproto *ofproto, struct ofservice *ofservice)
-{
-    hmap_remove(&ofproto->services, &ofservice->node);
-    pvconn_close(ofservice->pvconn);
-    free(ofservice);
-}
-
-/* Finds and returns the ofservice within 'ofproto' that has the given
- * 'target', or a null pointer if none exists. */
-static struct ofservice *
-ofservice_lookup(struct ofproto *ofproto, const char *target)
-{
-    struct ofservice *ofservice;
-
-    HMAP_FOR_EACH_WITH_HASH (ofservice, node, hash_string(target, 0),
-                             &ofproto->services) {
-        if (!strcmp(pvconn_get_name(ofservice->pvconn), target)) {
-            return ofservice;
-        }
-    }
-    return NULL;
-}
-\f
 /* Returns true if 'rule' should be hidden from the controller.
  *
  * Rules with priority higher than UINT16_MAX are set up by ofproto itself
@@ -2403,7 +1615,7 @@ facet_put__(struct ofproto *ofproto, struct facet *facet,
             const struct nlattr *actions, size_t actions_len,
             struct dpif_flow_stats *stats)
 {
-    uint32_t keybuf[ODPUTIL_FLOW_KEY_U32S];
+    struct odputil_keybuf keybuf;
     enum dpif_flow_put_flags flags;
     struct ofpbuf key;
 
@@ -2414,9 +1626,8 @@ facet_put__(struct ofproto *ofproto, struct facet *facet,
         facet->dp_byte_count = 0;
     }
 
-    ofpbuf_use_stack(&key, keybuf, sizeof keybuf);
+    ofpbuf_use_stack(&key, &keybuf, sizeof keybuf);
     odp_flow_key_from_flow(&key, &facet->flow);
-    assert(key.base == keybuf);
 
     return dpif_flow_put(ofproto->dpif, flags, key.data, key.size,
                          actions, actions_len, stats);
@@ -2460,13 +1671,12 @@ static void
 facet_uninstall(struct ofproto *p, struct facet *facet)
 {
     if (facet->installed) {
-        uint32_t keybuf[ODPUTIL_FLOW_KEY_U32S];
+        struct odputil_keybuf keybuf;
         struct dpif_flow_stats stats;
         struct ofpbuf key;
 
-        ofpbuf_use_stack(&key, keybuf, sizeof keybuf);
+        ofpbuf_use_stack(&key, &keybuf, sizeof keybuf);
         odp_flow_key_from_flow(&key, &facet->flow);
-        assert(key.base == keybuf);
 
         if (!dpif_flow_del(p->dpif, key.data, key.size, &stats)) {
             facet_update_stats(p, facet, &stats);
@@ -2650,22 +1860,6 @@ facet_revalidate(struct ofproto *ofproto, struct facet *facet)
     return true;
 }
 \f
-static void
-queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
-         struct rconn_packet_counter *counter)
-{
-    update_openflow_length(msg);
-    if (rconn_send(ofconn->rconn, msg, counter)) {
-        ofpbuf_delete(msg);
-    }
-}
-
-static void
-ofconn_send_reply(const struct ofconn *ofconn, struct ofpbuf *msg)
-{
-    queue_tx(msg, ofconn, ofconn->reply_counter);
-}
-
 static void
 send_error_oh(const struct ofconn *ofconn, const struct ofp_header *oh,
               int error)
@@ -2747,7 +1941,8 @@ handle_set_config(struct ofconn *ofconn, const struct ofp_switch_config *osc)
     struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
     uint16_t flags = ntohs(osc->flags);
 
-    if (ofconn->type == OFCONN_PRIMARY && ofconn->role != NX_ROLE_SLAVE) {
+    if (ofconn_get_type(ofconn) == OFCONN_PRIMARY
+        && ofconn_get_role(ofconn) != NX_ROLE_SLAVE) {
         switch (flags & OFPC_FRAG_MASK) {
         case OFPC_FRAG_NORMAL:
             dpif_set_drop_frags(ofproto->dpif, false);
@@ -3264,8 +2459,9 @@ xlate_actions(struct action_xlate_ctx *ctx,
 
     /* Check with in-band control to see if we're allowed to set up this
      * flow. */
-    if (!in_band_rule_check(ctx->ofproto->in_band, &ctx->flow,
-                            ctx->odp_actions->data, ctx->odp_actions->size)) {
+    if (!connmgr_may_set_up_flow(ctx->ofproto->connmgr, &ctx->flow,
+                                 ctx->odp_actions->data,
+                                 ctx->odp_actions->size)) {
         ctx->may_set_up_flow = false;
     }
 
@@ -3280,7 +2476,8 @@ xlate_actions(struct action_xlate_ctx *ctx,
 static int
 reject_slave_controller(struct ofconn *ofconn, const const char *msg_type)
 {
-    if (ofconn->type == OFCONN_PRIMARY && ofconn->role == NX_ROLE_SLAVE) {
+    if (ofconn_get_type(ofconn) == OFCONN_PRIMARY
+        && ofconn_get_role(ofconn) == NX_ROLE_SLAVE) {
         static struct vlog_rate_limit perm_rl = VLOG_RATE_LIMIT_INIT(1, 5);
         VLOG_WARN_RL(&perm_rl, "rejecting %s message from slave controller",
                      msg_type);
@@ -3326,8 +2523,8 @@ handle_packet_out(struct ofconn *ofconn, const struct ofp_header *oh)
 
     /* Get payload. */
     if (opo->buffer_id != htonl(UINT32_MAX)) {
-        error = pktbuf_retrieve(ofconn->pktbuf, ntohl(opo->buffer_id),
-                                &buffer, &in_port);
+        error = ofconn_pktbuf_retrieve(ofconn, ntohl(opo->buffer_id),
+                                       &buffer, &in_port);
         if (error || !buffer) {
             return error;
         }
@@ -4088,8 +3285,8 @@ add_flow(struct ofconn *ofconn, struct flow_mod *fm)
 
     error = 0;
     if (fm->buffer_id != UINT32_MAX) {
-        error = pktbuf_retrieve(ofconn->pktbuf, fm->buffer_id,
-                                &packet, &in_port);
+        error = ofconn_pktbuf_retrieve(ofconn, fm->buffer_id,
+                                       &packet, &in_port);
     } else {
         packet = NULL;
         in_port = UINT16_MAX;
@@ -4124,7 +3321,7 @@ send_buffered_packet(struct ofconn *ofconn,
         return 0;
     }
 
-    error = pktbuf_retrieve(ofconn->pktbuf, buffer_id, &packet, &in_port);
+    error = ofconn_pktbuf_retrieve(ofconn, buffer_id, &packet, &in_port);
     if (error) {
         return error;
     }
@@ -4348,9 +3545,8 @@ handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
     struct ofpbuf *buf;
     uint32_t role;
 
-    if (ofconn->type != OFCONN_PRIMARY) {
-        VLOG_WARN_RL(&rl, "ignoring role request on non-controller "
-                     "connection");
+    if (ofconn_get_type(ofconn) != OFCONN_PRIMARY) {
+        VLOG_WARN_RL(&rl, "ignoring role request on service connection");
         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_EPERM);
     }
 
@@ -4363,17 +3559,7 @@ handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
         return ofp_mkerr(OFPET_BAD_REQUEST, -1);
     }
 
-    if (role == NX_ROLE_MASTER) {
-        struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
-        struct ofconn *other;
-
-        HMAP_FOR_EACH (other, hmap_node, &ofproto->controllers) {
-            if (other->role == NX_ROLE_MASTER) {
-                other->role = NX_ROLE_SLAVE;
-            }
-        }
-    }
-    ofconn->role = role;
+    ofconn_set_role(ofconn, role);
 
     reply = make_nxmsg_xid(sizeof *reply, NXT_ROLE_REPLY, oh->xid, &buf);
     reply->role = htonl(role);
@@ -4563,7 +3749,7 @@ handle_miss_upcall(struct ofproto *p, struct dpif_upcall *upcall)
 
     /* Check with in-band control to see if this packet should be sent
      * to the local port regardless of the flow table. */
-    if (in_band_msg_in_hook(p->in_band, &flow, upcall->packet)) {
+    if (connmgr_msg_in_hook(p->connmgr, &flow, upcall->packet)) {
         ofproto_send_packet(p, ODPP_LOCAL, 0, upcall->packet);
     }
 
@@ -4924,7 +4110,6 @@ static void
 rule_send_removed(struct ofproto *p, struct rule *rule, uint8_t reason)
 {
     struct ofputil_flow_removed fr;
-    struct ofconn *ofconn;
 
     if (!rule->send_flow_removed) {
         return;
@@ -4938,22 +4123,7 @@ rule_send_removed(struct ofproto *p, struct rule *rule, uint8_t reason)
     fr.packet_count = rule->packet_count;
     fr.byte_count = rule->byte_count;
 
-    LIST_FOR_EACH (ofconn, node, &p->all_conns) {
-        struct ofpbuf *msg;
-
-        if (!rconn_is_connected(ofconn->rconn)
-            || !ofconn_receives_async_msgs(ofconn)) {
-            continue;
-        }
-
-        /* This accounts flow expirations as if they were replies to OpenFlow
-         * requests.  That works because preventing OpenFlow requests from
-         * being processed also prevents new flows from being added (and
-         * expiring).  (It also prevents processing OpenFlow requests that
-         * would not add new flows, so it is imperfect.) */
-        msg = ofputil_encode_flow_removed(&fr, ofconn_get_flow_format(ofconn));
-        ofconn_send_reply(ofconn, msg);
-    }
+    connmgr_send_flow_removed(p->connmgr, &fr);
 }
 
 /* Obtains statistics for 'rule' and stores them in '*packets' and '*bytes'.
@@ -4981,64 +4151,6 @@ rule_get_stats(const struct rule *rule, uint64_t *packets, uint64_t *bytes)
     *bytes = b;
 }
 
-/* pinsched callback for sending 'ofp_packet_in' on 'ofconn'. */
-static void
-do_send_packet_in(struct ofpbuf *ofp_packet_in, void *ofconn_)
-{
-    struct ofconn *ofconn = ofconn_;
-
-    rconn_send_with_limit(ofconn->rconn, ofp_packet_in,
-                          ofconn->packet_in_counter, 100);
-}
-
-/* Takes 'upcall', whose packet has the flow specified by 'flow', composes an
- * OpenFlow packet-in message from it, and passes it to 'ofconn''s packet
- * scheduler for sending.
- *
- * If 'clone' is true, the caller retains ownership of 'upcall->packet'.
- * Otherwise, ownership is transferred to this function. */
-static void
-schedule_packet_in(struct ofconn *ofconn, struct dpif_upcall *upcall,
-                   const struct flow *flow, bool clone)
-{
-    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
-    struct ofputil_packet_in pin;
-    struct ofpbuf *msg;
-
-    /* Figure out the easy parts. */
-    pin.packet = upcall->packet;
-    pin.in_port = odp_port_to_ofp_port(flow->in_port);
-    pin.reason = upcall->type == DPIF_UC_MISS ? OFPR_NO_MATCH : OFPR_ACTION;
-
-    /* Get OpenFlow buffer_id. */
-    if (upcall->type == DPIF_UC_ACTION) {
-        pin.buffer_id = UINT32_MAX;
-    } else if (ofproto->fail_open && fail_open_is_active(ofproto->fail_open)) {
-        pin.buffer_id = pktbuf_get_null();
-    } else if (!ofconn->pktbuf) {
-        pin.buffer_id = UINT32_MAX;
-    } else {
-        pin.buffer_id = pktbuf_save(ofconn->pktbuf, upcall->packet,
-                                    flow->in_port);
-    }
-
-    /* Figure out how much of the packet to send. */
-    pin.send_len = upcall->packet->size;
-    if (pin.buffer_id != UINT32_MAX) {
-        pin.send_len = MIN(pin.send_len, ofconn->miss_send_len);
-    }
-    if (upcall->type == DPIF_UC_ACTION) {
-        pin.send_len = MIN(pin.send_len, upcall->userdata);
-    }
-
-    /* Make OFPT_PACKET_IN and hand over to packet scheduler.  It might
-     * immediately call into do_send_packet_in() or it might buffer it for a
-     * while (until a later call to pinsched_run()). */
-    msg = ofputil_encode_packet_in(&pin, clone ? NULL : upcall->packet);
-    pinsched_send(ofconn->schedulers[upcall->type == DPIF_UC_MISS ? 0 : 1],
-                  flow->in_port, msg, do_send_packet_in, ofconn);
-}
-
 /* Given 'upcall', of type DPIF_UC_ACTION or DPIF_UC_MISS, sends an
  * OFPT_PACKET_IN message to each OpenFlow controller as necessary according to
  * their individual configurations.
@@ -5049,22 +4161,15 @@ static void
 send_packet_in(struct ofproto *ofproto, struct dpif_upcall *upcall,
                const struct flow *flow, bool clone)
 {
-    struct ofconn *ofconn, *prev;
+    struct ofputil_packet_in pin;
 
-    prev = NULL;
-    LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
-        if (ofconn_receives_async_msgs(ofconn)) {
-            if (prev) {
-                schedule_packet_in(prev, upcall, flow, true);
-            }
-            prev = ofconn;
-        }
-    }
-    if (prev) {
-        schedule_packet_in(prev, upcall, flow, clone);
-    } else if (!clone) {
-        ofpbuf_delete(upcall->packet);
-    }
+    pin.packet = upcall->packet;
+    pin.in_port = odp_port_to_ofp_port(flow->in_port);
+    pin.reason = upcall->type == DPIF_UC_MISS ? OFPR_NO_MATCH : OFPR_ACTION;
+    pin.buffer_id = 0;          /* not yet known */
+    pin.send_len = upcall->userdata;
+    connmgr_send_packet_in(ofproto->connmgr, upcall, flow,
+                           clone ? NULL : upcall->packet);
 }
 
 static uint64_t