ofproto: Factor OpenFlow connection management into new "connmgr".
[sliver-openvswitch.git] / ofproto / ofproto.c
index 25a9db6..ce8c99b 100644 (file)
@@ -27,6 +27,7 @@
 #include "byte-order.h"
 #include "cfm.h"
 #include "classifier.h"
+#include "connmgr.h"
 #include "coverage.h"
 #include "dpif.h"
 #include "dynamic-string.h"
@@ -79,7 +80,6 @@ COVERAGE_DEFINE(ofproto_flows_req);
 COVERAGE_DEFINE(ofproto_flush);
 COVERAGE_DEFINE(ofproto_invalidated);
 COVERAGE_DEFINE(ofproto_no_packet_in);
-COVERAGE_DEFINE(ofproto_ofconn_stuck);
 COVERAGE_DEFINE(ofproto_ofp2odp);
 COVERAGE_DEFINE(ofproto_packet_in);
 COVERAGE_DEFINE(ofproto_packet_out);
@@ -270,107 +270,8 @@ static void facet_update_stats(struct ofproto *, struct facet *,
                                const struct dpif_flow_stats *);
 static void facet_push_stats(struct ofproto *, struct facet *);
 
-/* ofproto supports two kinds of OpenFlow connections:
- *
- *   - "Primary" connections to ordinary OpenFlow controllers.  ofproto
- *     maintains persistent connections to these controllers and by default
- *     sends them asynchronous messages such as packet-ins.
- *
- *   - "Service" connections, e.g. from ovs-ofctl.  When these connections
- *     drop, it is the other side's responsibility to reconnect them if
- *     necessary.  ofproto does not send them asynchronous messages by default.
- *
- * Currently, active (tcp, ssl, unix) connections are always "primary"
- * connections and passive (ptcp, pssl, punix) connections are always "service"
- * connections.  There is no inherent reason for this, but it reflects the
- * common case.
- */
-enum ofconn_type {
-    OFCONN_PRIMARY,             /* An ordinary OpenFlow controller. */
-    OFCONN_SERVICE              /* A service connection, e.g. "ovs-ofctl". */
-};
-
-/* A listener for incoming OpenFlow "service" connections. */
-struct ofservice {
-    struct hmap_node node;      /* In struct ofproto's "services" hmap. */
-    struct pvconn *pvconn;      /* OpenFlow connection listener. */
-
-    /* These are not used by ofservice directly.  They are settings for
-     * accepted "struct ofconn"s from the pvconn. */
-    int probe_interval;         /* Max idle time before probing, in seconds. */
-    int rate_limit;             /* Max packet-in rate in packets per second. */
-    int burst_limit;            /* Limit on accumulating packet credits. */
-};
-
-static struct ofservice *ofservice_lookup(struct ofproto *,
-                                          const char *target);
-static int ofservice_create(struct ofproto *,
-                            const struct ofproto_controller *);
-static void ofservice_reconfigure(struct ofservice *,
-                                  const struct ofproto_controller *);
-static void ofservice_destroy(struct ofproto *, struct ofservice *);
-
-/* An OpenFlow connection. */
-struct ofconn {
-    struct ofproto *ofproto;    /* The ofproto that owns this connection. */
-    struct list node;           /* In struct ofproto's "all_conns" list. */
-    struct rconn *rconn;        /* OpenFlow connection. */
-    enum ofconn_type type;      /* Type. */
-    enum nx_flow_format flow_format; /* Currently selected flow format. */
-
-    /* OFPT_PACKET_IN related data. */
-    struct rconn_packet_counter *packet_in_counter; /* # queued on 'rconn'. */
-#define N_SCHEDULERS 2
-    struct pinsched *schedulers[N_SCHEDULERS];
-    struct pktbuf *pktbuf;         /* OpenFlow packet buffers. */
-    int miss_send_len;             /* Bytes to send of buffered packets. */
-
-    /* Number of OpenFlow messages queued on 'rconn' as replies to OpenFlow
-     * requests, and the maximum number before we stop reading OpenFlow
-     * requests.  */
-#define OFCONN_REPLY_MAX 100
-    struct rconn_packet_counter *reply_counter;
-
-    /* type == OFCONN_PRIMARY only. */
-    enum nx_role role;           /* Role. */
-    struct hmap_node hmap_node;  /* In struct ofproto's "controllers" map. */
-    enum ofproto_band band;      /* In-band or out-of-band? */
-};
-
-
-static struct ofconn *ofconn_create(struct ofproto *, struct rconn *,
-                                    enum ofconn_type);
-static void ofconn_destroy(struct ofconn *);
-static void ofconn_run(struct ofconn *);
-static void ofconn_wait(struct ofconn *);
-
-static bool ofconn_receives_async_msgs(const struct ofconn *);
-static char *ofconn_make_name(const struct ofproto *, const char *target);
-static void ofconn_set_rate_limit(struct ofconn *, int rate, int burst);
-
-static struct ofproto *ofconn_get_ofproto(struct ofconn *);
-
-static enum nx_flow_format ofconn_get_flow_format(struct ofconn *);
-static void ofconn_set_flow_format(struct ofconn *, enum nx_flow_format);
-
-static int ofconn_get_miss_send_len(const struct ofconn *);
-static void ofconn_set_miss_send_len(struct ofconn *, int miss_send_len);
-
-static enum ofconn_type ofconn_get_type(const struct ofconn *);
-
-static enum nx_role ofconn_get_role(const struct ofconn *);
-static void ofconn_set_role(struct ofconn *, enum nx_role);
-
-static int ofconn_pktbuf_retrieve(struct ofconn *, uint32_t id,
-                                  struct ofpbuf **bufferp, uint16_t *in_port);
-
-
-static void queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
-                     struct rconn_packet_counter *counter);
-
 static void send_packet_in(struct ofproto *, struct dpif_upcall *,
                            const struct flow *, bool clone);
-static void do_send_packet_in(struct ofpbuf *ofp_packet_in, void *ofconn);
 
 struct ofproto {
     /* Settings. */
@@ -390,17 +291,9 @@ struct ofproto {
     uint32_t max_ports;
 
     /* Configuration. */
-    struct fail_open *fail_open;
     struct netflow *netflow;
     struct ofproto_sflow *sflow;
 
-    /* In-band control. */
-    struct in_band *in_band;
-    long long int next_in_band_update;
-    struct sockaddr_in *extra_in_band_remotes;
-    size_t n_extra_remotes;
-    int in_band_queue;
-
     /* Flow table. */
     struct classifier cls;
     long long int next_expiration;
@@ -411,14 +304,7 @@ struct ofproto {
     struct tag_set revalidate_set;
 
     /* OpenFlow connections. */
-    struct hmap controllers;   /* Controller "struct ofconn"s. */
-    struct list all_conns;     /* Contains "struct ofconn"s. */
-    enum ofproto_fail_mode fail_mode;
-
-    /* OpenFlow listeners. */
-    struct hmap services;       /* Contains "struct ofservice"s. */
-    struct pvconn **snoops;
-    size_t n_snoops;
+    struct connmgr *connmgr;
 
     /* Hooks for ovs-vswitchd. */
     const struct ofhooks *ofhooks;
@@ -459,6 +345,7 @@ ofproto_create(const char *datapath, const char *datapath_type,
                const struct ofhooks *ofhooks, void *aux,
                struct ofproto **ofprotop)
 {
+    char local_name[IF_NAMESIZE];
     struct ofproto *p;
     struct dpif *dpif;
     int error;
@@ -486,6 +373,14 @@ ofproto_create(const char *datapath, const char *datapath_type,
     dpif_flow_flush(dpif);
     dpif_recv_purge(dpif);
 
+    error = dpif_port_get_name(dpif, ODPP_LOCAL,
+                               local_name, sizeof local_name);
+    if (error) {
+        VLOG_ERR("%s: cannot get name of datapath local port (%s)",
+                 datapath, strerror(error));
+        return error;
+    }
+
     /* Initialize settings. */
     p = xzalloc(sizeof *p);
     p->fallback_dpid = pick_fallback_dpid();
@@ -504,14 +399,9 @@ ofproto_create(const char *datapath, const char *datapath_type,
     p->max_ports = dpif_get_max_ports(dpif);
 
     /* Initialize submodules. */
-    p->fail_open = NULL;
     p->netflow = NULL;
     p->sflow = NULL;
 
-    /* Initialize in-band control. */
-    p->in_band = NULL;
-    p->in_band_queue = -1;
-
     /* Initialize flow table. */
     classifier_init(&p->cls);
     p->next_expiration = time_msec() + 1000;
@@ -521,13 +411,6 @@ ofproto_create(const char *datapath, const char *datapath_type,
     p->need_revalidate = false;
     tag_set_init(&p->revalidate_set);
 
-    /* Initialize OpenFlow connections. */
-    list_init(&p->all_conns);
-    hmap_init(&p->controllers);
-    hmap_init(&p->services);
-    p->snoops = NULL;
-    p->n_snoops = 0;
-
     /* Initialize hooks. */
     if (ofhooks) {
         p->ofhooks = ofhooks;
@@ -545,6 +428,9 @@ ofproto_create(const char *datapath, const char *datapath_type,
 
     shash_add_once(&all_ofprotos, dpif_name(p->dpif), p);
 
+    /* Initialize OpenFlow connections. */
+    p->connmgr = connmgr_create(p, datapath, local_name);
+
     *ofprotop = p;
     return 0;
 }
@@ -563,212 +449,18 @@ ofproto_set_datapath_id(struct ofproto *p, uint64_t datapath_id)
     }
 }
 
-/* Creates a new controller in 'ofproto'.  Some of the settings are initially
- * drawn from 'c', but update_controller() needs to be called later to finish
- * the new ofconn's configuration. */
-static void
-add_controller(struct ofproto *ofproto, const struct ofproto_controller *c)
-{
-    char *name = ofconn_make_name(ofproto, c->target);
-    struct ofconn *ofconn;
-
-    ofconn = ofconn_create(ofproto, rconn_create(5, 8), OFCONN_PRIMARY);
-    ofconn->pktbuf = pktbuf_create();
-    ofconn->miss_send_len = OFP_DEFAULT_MISS_SEND_LEN;
-    rconn_connect(ofconn->rconn, c->target, name);
-    hmap_insert(&ofproto->controllers, &ofconn->hmap_node,
-                hash_string(c->target, 0));
-
-    free(name);
-}
-
-/* Reconfigures 'ofconn' to match 'c'.  This function cannot update an ofconn's
- * target (this is done by creating new ofconns and deleting old ones), but it
- * can update the rest of an ofconn's settings. */
-static void
-update_controller(struct ofconn *ofconn, const struct ofproto_controller *c)
-{
-    int probe_interval;
-
-    ofconn->band = c->band;
-
-    rconn_set_max_backoff(ofconn->rconn, c->max_backoff);
-
-    probe_interval = c->probe_interval ? MAX(c->probe_interval, 5) : 0;
-    rconn_set_probe_interval(ofconn->rconn, probe_interval);
-
-    ofconn_set_rate_limit(ofconn, c->rate_limit, c->burst_limit);
-}
-
-static const char *
-ofconn_get_target(const struct ofconn *ofconn)
-{
-    return rconn_get_target(ofconn->rconn);
-}
-
-static struct ofconn *
-find_controller_by_target(struct ofproto *ofproto, const char *target)
-{
-    struct ofconn *ofconn;
-
-    HMAP_FOR_EACH_WITH_HASH (ofconn, hmap_node,
-                             hash_string(target, 0), &ofproto->controllers) {
-        if (!strcmp(ofconn_get_target(ofconn), target)) {
-            return ofconn;
-        }
-    }
-    return NULL;
-}
-
-static void
-update_in_band_remotes(struct ofproto *ofproto)
-{
-    const struct ofconn *ofconn;
-    struct sockaddr_in *addrs;
-    size_t max_addrs, n_addrs;
-    size_t i;
-
-    /* Allocate enough memory for as many remotes as we could possibly have. */
-    max_addrs = ofproto->n_extra_remotes + hmap_count(&ofproto->controllers);
-    addrs = xmalloc(max_addrs * sizeof *addrs);
-    n_addrs = 0;
-
-    /* Add all the remotes. */
-    HMAP_FOR_EACH (ofconn, hmap_node, &ofproto->controllers) {
-        struct sockaddr_in *sin = &addrs[n_addrs];
-
-        if (ofconn->band == OFPROTO_OUT_OF_BAND) {
-            continue;
-        }
-
-        sin->sin_addr.s_addr = rconn_get_remote_ip(ofconn->rconn);
-        if (sin->sin_addr.s_addr) {
-            sin->sin_port = rconn_get_remote_port(ofconn->rconn);
-            n_addrs++;
-        }
-    }
-    for (i = 0; i < ofproto->n_extra_remotes; i++) {
-        addrs[n_addrs++] = ofproto->extra_in_band_remotes[i];
-    }
-
-    /* Create or update or destroy in-band. */
-    if (n_addrs) {
-        if (!ofproto->in_band) {
-            in_band_create(ofproto, ofproto->dpif, &ofproto->in_band);
-        }
-        if (ofproto->in_band) {
-            in_band_set_remotes(ofproto->in_band, addrs, n_addrs);
-        }
-        in_band_set_queue(ofproto->in_band, ofproto->in_band_queue);
-        ofproto->next_in_band_update = time_msec() + 1000;
-    } else {
-        in_band_destroy(ofproto->in_band);
-        ofproto->in_band = NULL;
-    }
-
-    /* Clean up. */
-    free(addrs);
-}
-
-static void
-update_fail_open(struct ofproto *p)
-{
-    struct ofconn *ofconn;
-
-    if (!hmap_is_empty(&p->controllers)
-            && p->fail_mode == OFPROTO_FAIL_STANDALONE) {
-        struct rconn **rconns;
-        size_t n;
-
-        if (!p->fail_open) {
-            p->fail_open = fail_open_create(p);
-        }
-
-        n = 0;
-        rconns = xmalloc(hmap_count(&p->controllers) * sizeof *rconns);
-        HMAP_FOR_EACH (ofconn, hmap_node, &p->controllers) {
-            rconns[n++] = ofconn->rconn;
-        }
-
-        fail_open_set_controllers(p->fail_open, rconns, n);
-        /* p->fail_open takes ownership of 'rconns'. */
-    } else {
-        fail_open_destroy(p->fail_open);
-        p->fail_open = NULL;
-    }
-}
-
 void
 ofproto_set_controllers(struct ofproto *p,
                         const struct ofproto_controller *controllers,
                         size_t n_controllers)
 {
-    struct shash new_controllers;
-    struct ofconn *ofconn, *next_ofconn;
-    struct ofservice *ofservice, *next_ofservice;
-    size_t i;
-
-    /* Create newly configured controllers and services.
-     * Create a name to ofproto_controller mapping in 'new_controllers'. */
-    shash_init(&new_controllers);
-    for (i = 0; i < n_controllers; i++) {
-        const struct ofproto_controller *c = &controllers[i];
-
-        if (!vconn_verify_name(c->target)) {
-            if (!find_controller_by_target(p, c->target)) {
-                add_controller(p, c);
-            }
-        } else if (!pvconn_verify_name(c->target)) {
-            if (!ofservice_lookup(p, c->target) && ofservice_create(p, c)) {
-                continue;
-            }
-        } else {
-            VLOG_WARN_RL(&rl, "%s: unsupported controller \"%s\"",
-                         dpif_name(p->dpif), c->target);
-            continue;
-        }
-
-        shash_add_once(&new_controllers, c->target, &controllers[i]);
-    }
-
-    /* Delete controllers that are no longer configured.
-     * Update configuration of all now-existing controllers. */
-    HMAP_FOR_EACH_SAFE (ofconn, next_ofconn, hmap_node, &p->controllers) {
-        struct ofproto_controller *c;
-
-        c = shash_find_data(&new_controllers, ofconn_get_target(ofconn));
-        if (!c) {
-            ofconn_destroy(ofconn);
-        } else {
-            update_controller(ofconn, c);
-        }
-    }
-
-    /* Delete services that are no longer configured.
-     * Update configuration of all now-existing services. */
-    HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &p->services) {
-        struct ofproto_controller *c;
-
-        c = shash_find_data(&new_controllers,
-                            pvconn_get_name(ofservice->pvconn));
-        if (!c) {
-            ofservice_destroy(p, ofservice);
-        } else {
-            ofservice_reconfigure(ofservice, c);
-        }
-    }
-
-    shash_destroy(&new_controllers);
-
-    update_in_band_remotes(p);
-    update_fail_open(p);
+    connmgr_set_controllers(p->connmgr, controllers, n_controllers);
 }
 
 void
 ofproto_set_fail_mode(struct ofproto *p, enum ofproto_fail_mode fail_mode)
 {
-    p->fail_mode = fail_mode;
-    update_fail_open(p);
+    connmgr_set_fail_mode(p->connmgr, fail_mode);
 }
 
 /* Drops the connections between 'ofproto' and all of its controllers, forcing
@@ -776,34 +468,7 @@ ofproto_set_fail_mode(struct ofproto *p, enum ofproto_fail_mode fail_mode)
 void
 ofproto_reconnect_controllers(struct ofproto *ofproto)
 {
-    struct ofconn *ofconn;
-
-    LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
-        rconn_reconnect(ofconn->rconn);
-    }
-}
-
-static bool
-any_extras_changed(const struct ofproto *ofproto,
-                   const struct sockaddr_in *extras, size_t n)
-{
-    size_t i;
-
-    if (n != ofproto->n_extra_remotes) {
-        return true;
-    }
-
-    for (i = 0; i < n; i++) {
-        const struct sockaddr_in *old = &ofproto->extra_in_band_remotes[i];
-        const struct sockaddr_in *new = &extras[i];
-
-        if (old->sin_addr.s_addr != new->sin_addr.s_addr ||
-            old->sin_port != new->sin_port) {
-            return true;
-        }
-    }
-
-    return false;
+    connmgr_reconnect(ofproto->connmgr);
 }
 
 /* Sets the 'n' TCP port addresses in 'extras' as ones to which 'ofproto''s
@@ -813,15 +478,7 @@ void
 ofproto_set_extra_in_band_remotes(struct ofproto *ofproto,
                                   const struct sockaddr_in *extras, size_t n)
 {
-    if (!any_extras_changed(ofproto, extras, n)) {
-        return;
-    }
-
-    free(ofproto->extra_in_band_remotes);
-    ofproto->n_extra_remotes = n;
-    ofproto->extra_in_band_remotes = xmemdup(extras, n * sizeof *extras);
-
-    update_in_band_remotes(ofproto);
+    connmgr_set_extra_in_band_remotes(ofproto->connmgr, extras, n);
 }
 
 /* Sets the OpenFlow queue used by flows set up by in-band control on
@@ -830,10 +487,7 @@ ofproto_set_extra_in_band_remotes(struct ofproto *ofproto,
 void
 ofproto_set_in_band_queue(struct ofproto *ofproto, int queue_id)
 {
-    if (queue_id != ofproto->in_band_queue) {
-        ofproto->in_band_queue = queue_id;
-        update_in_band_remotes(ofproto);
-    }
+    connmgr_set_in_band_queue(ofproto->connmgr, queue_id);
 }
 
 void
@@ -887,48 +541,10 @@ ofproto_set_desc(struct ofproto *p,
     }
 }
 
-static int
-set_pvconns(struct pvconn ***pvconnsp, size_t *n_pvconnsp,
-            const struct svec *svec)
-{
-    struct pvconn **pvconns = *pvconnsp;
-    size_t n_pvconns = *n_pvconnsp;
-    int retval = 0;
-    size_t i;
-
-    for (i = 0; i < n_pvconns; i++) {
-        pvconn_close(pvconns[i]);
-    }
-    free(pvconns);
-
-    pvconns = xmalloc(svec->n * sizeof *pvconns);
-    n_pvconns = 0;
-    for (i = 0; i < svec->n; i++) {
-        const char *name = svec->names[i];
-        struct pvconn *pvconn;
-        int error;
-
-        error = pvconn_open(name, &pvconn);
-        if (!error) {
-            pvconns[n_pvconns++] = pvconn;
-        } else {
-            VLOG_ERR("failed to listen on %s: %s", name, strerror(error));
-            if (!retval) {
-                retval = error;
-            }
-        }
-    }
-
-    *pvconnsp = pvconns;
-    *n_pvconnsp = n_pvconns;
-
-    return retval;
-}
-
 int
 ofproto_set_snoops(struct ofproto *ofproto, const struct svec *snoops)
 {
-    return set_pvconns(&ofproto->snoops, &ofproto->n_snoops, snoops);
+    return connmgr_set_snoops(ofproto->connmgr, snoops);
 }
 
 int
@@ -1041,32 +657,25 @@ ofproto_get_datapath_id(const struct ofproto *ofproto)
 bool
 ofproto_has_primary_controller(const struct ofproto *ofproto)
 {
-    return !hmap_is_empty(&ofproto->controllers);
+    return connmgr_has_controllers(ofproto->connmgr);
 }
 
 enum ofproto_fail_mode
 ofproto_get_fail_mode(const struct ofproto *p)
 {
-    return p->fail_mode;
+    return connmgr_get_fail_mode(p->connmgr);
 }
 
 void
 ofproto_get_snoops(const struct ofproto *ofproto, struct svec *snoops)
 {
-    size_t i;
-
-    for (i = 0; i < ofproto->n_snoops; i++) {
-        svec_add(snoops, pvconn_get_name(ofproto->snoops[i]));
-    }
+    connmgr_get_snoops(ofproto->connmgr, snoops);
 }
 
 void
 ofproto_destroy(struct ofproto *p)
 {
-    struct ofservice *ofservice, *next_ofservice;
-    struct ofconn *ofconn, *next_ofconn;
     struct ofport *ofport, *next_ofport;
-    size_t i;
 
     if (!p) {
         return;
@@ -1074,23 +683,13 @@ ofproto_destroy(struct ofproto *p)
 
     shash_find_and_delete(&all_ofprotos, dpif_name(p->dpif));
 
-    /* Destroy fail-open and in-band early, since they touch the classifier. */
-    fail_open_destroy(p->fail_open);
-    p->fail_open = NULL;
-
-    in_band_destroy(p->in_band);
-    p->in_band = NULL;
-    free(p->extra_in_band_remotes);
+    /* Destroy connmgr early, since it touches the classifier. */
+    connmgr_destroy(p->connmgr);
 
     ofproto_flush_flows(p);
     classifier_destroy(&p->cls);
     hmap_destroy(&p->facets);
 
-    LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &p->all_conns) {
-        ofconn_destroy(ofconn);
-    }
-    hmap_destroy(&p->controllers);
-
     dpif_close(p->dpif);
     netdev_monitor_destroy(p->netdev_monitor);
     HMAP_FOR_EACH_SAFE (ofport, next_ofport, hmap_node, &p->ports) {
@@ -1102,16 +701,6 @@ ofproto_destroy(struct ofproto *p)
     netflow_destroy(p->netflow);
     ofproto_sflow_destroy(p->sflow);
 
-    HMAP_FOR_EACH_SAFE (ofservice, next_ofservice, node, &p->services) {
-        ofservice_destroy(p, ofservice);
-    }
-    hmap_destroy(&p->services);
-
-    for (i = 0; i < p->n_snoops; i++) {
-        pvconn_close(p->snoops[i]);
-    }
-    free(p->snoops);
-
     mac_learning_destroy(p->ml);
 
     free(p->mfr_desc);
@@ -1146,54 +735,9 @@ process_port_change(struct ofproto *ofproto, int error, char *devname)
     }
 }
 
-/* Returns a "preference level" for snooping 'ofconn'.  A higher return value
- * means that 'ofconn' is more interesting for monitoring than a lower return
- * value. */
-static int
-snoop_preference(const struct ofconn *ofconn)
-{
-    switch (ofconn_get_role(ofconn)) {
-    case NX_ROLE_MASTER:
-        return 3;
-    case NX_ROLE_OTHER:
-        return 2;
-    case NX_ROLE_SLAVE:
-        return 1;
-    default:
-        /* Shouldn't happen. */
-        return 0;
-    }
-}
-
-/* One of ofproto's "snoop" pvconns has accepted a new connection on 'vconn'.
- * Connects this vconn to a controller. */
-static void
-add_snooper(struct ofproto *ofproto, struct vconn *vconn)
-{
-    struct ofconn *ofconn, *best;
-
-    /* Pick a controller for monitoring. */
-    best = NULL;
-    LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
-        if (ofconn_get_type(ofconn) == OFCONN_PRIMARY
-            && (!best || snoop_preference(ofconn) > snoop_preference(best))) {
-            best = ofconn;
-        }
-    }
-
-    if (best) {
-        rconn_add_monitor(best->rconn, vconn);
-    } else {
-        VLOG_INFO_RL(&rl, "no controller connection to snoop");
-        vconn_close(vconn);
-    }
-}
-
 int
 ofproto_run1(struct ofproto *p)
 {
-    struct ofconn *ofconn, *next_ofconn;
-    struct ofservice *ofservice;
     struct ofport *ofport;
     char *devname;
     int error;
@@ -1235,56 +779,7 @@ ofproto_run1(struct ofproto *p)
         ofport_run(p, ofport);
     }
 
-    if (p->in_band) {
-        if (time_msec() >= p->next_in_band_update) {
-            update_in_band_remotes(p);
-        }
-        in_band_run(p->in_band);
-    }
-
-    LIST_FOR_EACH_SAFE (ofconn, next_ofconn, node, &p->all_conns) {
-        ofconn_run(ofconn);
-    }
-
-    /* Fail-open maintenance.  Do this after processing the ofconns since
-     * fail-open checks the status of the controller rconn. */
-    if (p->fail_open) {
-        fail_open_run(p->fail_open);
-    }
-
-    HMAP_FOR_EACH (ofservice, node, &p->services) {
-        struct vconn *vconn;
-        int retval;
-
-        retval = pvconn_accept(ofservice->pvconn, OFP_VERSION, &vconn);
-        if (!retval) {
-            struct rconn *rconn;
-            char *name;
-
-            rconn = rconn_create(ofservice->probe_interval, 0);
-            name = ofconn_make_name(p, vconn_get_name(vconn));
-            rconn_connect_unreliably(rconn, vconn, name);
-            free(name);
-
-            ofconn = ofconn_create(p, rconn, OFCONN_SERVICE);
-            ofconn_set_rate_limit(ofconn, ofservice->rate_limit,
-                                  ofservice->burst_limit);
-        } else if (retval != EAGAIN) {
-            VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
-        }
-    }
-
-    for (i = 0; i < p->n_snoops; i++) {
-        struct vconn *vconn;
-        int retval;
-
-        retval = pvconn_accept(p->snoops[i], OFP_VERSION, &vconn);
-        if (!retval) {
-            add_snooper(p, vconn);
-        } else if (retval != EAGAIN) {
-            VLOG_WARN_RL(&rl, "accept failed (%s)", strerror(retval));
-        }
-    }
+    connmgr_run(p->connmgr, handle_openflow);
 
     if (time_msec() >= p->next_expiration) {
         int delay = ofproto_expire(p);
@@ -1333,27 +828,14 @@ ofproto_run2(struct ofproto *p, bool revalidate_all)
 void
 ofproto_wait(struct ofproto *p)
 {
-    struct ofservice *ofservice;
-    struct ofconn *ofconn;
     struct ofport *ofport;
-    size_t i;
 
-    dpif_recv_wait(p->dpif);
-    dpif_port_poll_wait(p->dpif);
-    netdev_monitor_poll_wait(p->netdev_monitor);
     HMAP_FOR_EACH (ofport, hmap_node, &p->ports) {
         ofport_wait(ofport);
     }
-    LIST_FOR_EACH (ofconn, node, &p->all_conns) {
-        ofconn_wait(ofconn);
-    }
-    if (p->in_band) {
-        poll_timer_wait_until(p->next_in_band_update);
-        in_band_wait(p->in_band);
-    }
-    if (p->fail_open) {
-        fail_open_wait(p->fail_open);
-    }
+    dpif_recv_wait(p->dpif);
+    dpif_port_poll_wait(p->dpif);
+    netdev_monitor_poll_wait(p->netdev_monitor);
     if (p->sflow) {
         ofproto_sflow_wait(p->sflow);
     }
@@ -1367,12 +849,7 @@ ofproto_wait(struct ofproto *p)
     } else if (p->next_expiration != LLONG_MAX) {
         poll_timer_wait_until(p->next_expiration);
     }
-    HMAP_FOR_EACH (ofservice, node, &p->services) {
-        pvconn_wait(ofservice->pvconn);
-    }
-    for (i = 0; i < p->n_snoops; i++) {
-        pvconn_wait(p->snoops[i]);
-    }
+    connmgr_wait(p->connmgr);
 }
 
 void
@@ -1390,54 +867,14 @@ ofproto_get_revalidate_set(struct ofproto *ofproto)
 bool
 ofproto_is_alive(const struct ofproto *p)
 {
-    return !hmap_is_empty(&p->controllers);
+    return connmgr_has_controllers(p->connmgr);
 }
 
 void
 ofproto_get_ofproto_controller_info(const struct ofproto *ofproto,
                                     struct shash *info)
 {
-    const struct ofconn *ofconn;
-
-    shash_init(info);
-
-    HMAP_FOR_EACH (ofconn, hmap_node, &ofproto->controllers) {
-        const struct rconn *rconn = ofconn->rconn;
-        time_t now = time_now();
-        time_t last_connection = rconn_get_last_connection(rconn);
-        time_t last_disconnect = rconn_get_last_disconnect(rconn);
-        const int last_error = rconn_get_last_error(rconn);
-        struct ofproto_controller_info *cinfo = xmalloc(sizeof *cinfo);
-
-        shash_add(info, rconn_get_target(rconn), cinfo);
-
-        cinfo->is_connected = rconn_is_connected(rconn);
-        cinfo->role = ofconn_get_role(ofconn);
-
-        cinfo->pairs.n = 0;
-
-        if (last_error) {
-            cinfo->pairs.keys[cinfo->pairs.n] = "last_error";
-            cinfo->pairs.values[cinfo->pairs.n++] =
-                xstrdup(ovs_retval_to_string(last_error));
-        }
-
-        cinfo->pairs.keys[cinfo->pairs.n] = "state";
-        cinfo->pairs.values[cinfo->pairs.n++] =
-            xstrdup(rconn_get_state(rconn));
-
-        if (last_connection != TIME_MIN) {
-            cinfo->pairs.keys[cinfo->pairs.n] = "sec_since_connect";
-            cinfo->pairs.values[cinfo->pairs.n++]
-                = xasprintf("%ld", (long int) (now - last_connection));
-        }
-
-        if (last_disconnect != TIME_MIN) {
-            cinfo->pairs.keys[cinfo->pairs.n] = "sec_since_disconnect";
-            cinfo->pairs.values[cinfo->pairs.n++]
-                = xasprintf("%ld", (long int) (now - last_disconnect));
-        }
-    }
+    connmgr_get_controller_info(ofproto->connmgr, info);
 }
 
 void
@@ -1583,12 +1020,7 @@ ofproto_flush_flows(struct ofproto *ofproto)
     }
 
     dpif_flow_flush(ofproto->dpif);
-    if (ofproto->in_band) {
-        in_band_flushed(ofproto->in_band);
-    }
-    if (ofproto->fail_open) {
-        fail_open_flushed(ofproto->fail_open);
-    }
+    connmgr_flushed(ofproto->connmgr);
 }
 \f
 static void
@@ -1695,25 +1127,7 @@ static void
 send_port_status(struct ofproto *p, const struct ofport *ofport,
                  uint8_t reason)
 {
-    /* XXX Should limit the number of queued port status change messages. */
-    struct ofconn *ofconn;
-    LIST_FOR_EACH (ofconn, node, &p->all_conns) {
-        struct ofp_port_status *ops;
-        struct ofpbuf *b;
-
-        /* Primary controllers, even slaves, should always get port status
-           updates.  Otherwise obey ofconn_receives_async_msgs(). */
-        if (ofconn_get_type(ofconn) != OFCONN_PRIMARY
-            && !ofconn_receives_async_msgs(ofconn)) {
-            continue;
-        }
-
-        ops = make_openflow_xid(sizeof *ops, OFPT_PORT_STATUS, 0, &b);
-        ops->reason = reason;
-        ops->desc = ofport->opp;
-        hton_ofp_phy_port(&ops->desc);
-        queue_tx(b, ofconn, NULL);
-    }
+    connmgr_send_port_status(p->connmgr, &ofport->opp, reason);
 }
 
 static void
@@ -1883,250 +1297,6 @@ init_ports(struct ofproto *p)
     return 0;
 }
 \f
-static struct ofconn *
-ofconn_create(struct ofproto *p, struct rconn *rconn, enum ofconn_type type)
-{
-    struct ofconn *ofconn = xzalloc(sizeof *ofconn);
-    ofconn->ofproto = p;
-    list_push_back(&p->all_conns, &ofconn->node);
-    ofconn->rconn = rconn;
-    ofconn->type = type;
-    ofconn->flow_format = NXFF_OPENFLOW10;
-    ofconn->role = NX_ROLE_OTHER;
-    ofconn->packet_in_counter = rconn_packet_counter_create ();
-    ofconn->pktbuf = NULL;
-    ofconn->miss_send_len = 0;
-    ofconn->reply_counter = rconn_packet_counter_create ();
-    return ofconn;
-}
-
-static void
-ofconn_destroy(struct ofconn *ofconn)
-{
-    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
-
-    if (ofconn_get_type(ofconn) == OFCONN_PRIMARY) {
-        hmap_remove(&ofproto->controllers, &ofconn->hmap_node);
-    }
-
-    list_remove(&ofconn->node);
-    rconn_destroy(ofconn->rconn);
-    rconn_packet_counter_destroy(ofconn->packet_in_counter);
-    rconn_packet_counter_destroy(ofconn->reply_counter);
-    pktbuf_destroy(ofconn->pktbuf);
-    free(ofconn);
-}
-
-static void
-ofconn_run(struct ofconn *ofconn)
-{
-    struct ofproto *p = ofconn_get_ofproto(ofconn);
-    int iteration;
-    size_t i;
-
-    for (i = 0; i < N_SCHEDULERS; i++) {
-        pinsched_run(ofconn->schedulers[i], do_send_packet_in, ofconn);
-    }
-
-    rconn_run(ofconn->rconn);
-
-    if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
-        /* Limit the number of iterations to prevent other tasks from
-         * starving. */
-        for (iteration = 0; iteration < 50; iteration++) {
-            struct ofpbuf *of_msg = rconn_recv(ofconn->rconn);
-            if (!of_msg) {
-                break;
-            }
-            if (p->fail_open) {
-                fail_open_maybe_recover(p->fail_open);
-            }
-            handle_openflow(ofconn, of_msg);
-            ofpbuf_delete(of_msg);
-        }
-    }
-
-    if (!rconn_is_alive(ofconn->rconn)) {
-        ofconn_destroy(ofconn);
-    }
-}
-
-static void
-ofconn_wait(struct ofconn *ofconn)
-{
-    int i;
-
-    for (i = 0; i < N_SCHEDULERS; i++) {
-        pinsched_wait(ofconn->schedulers[i]);
-    }
-    rconn_run_wait(ofconn->rconn);
-    if (rconn_packet_counter_read (ofconn->reply_counter) < OFCONN_REPLY_MAX) {
-        rconn_recv_wait(ofconn->rconn);
-    } else {
-        COVERAGE_INC(ofproto_ofconn_stuck);
-    }
-}
-
-/* Returns true if 'ofconn' should receive asynchronous messages. */
-static bool
-ofconn_receives_async_msgs(const struct ofconn *ofconn)
-{
-    if (ofconn_get_type(ofconn) == OFCONN_PRIMARY) {
-        /* Primary controllers always get asynchronous messages unless they
-         * have configured themselves as "slaves".  */
-        return ofconn_get_role(ofconn) != NX_ROLE_SLAVE;
-    } else {
-        /* Service connections don't get asynchronous messages unless they have
-         * explicitly asked for them by setting a nonzero miss send length. */
-        return ofconn->miss_send_len > 0;
-    }
-}
-
-/* Returns a human-readable name for an OpenFlow connection between 'ofproto'
- * and 'target', suitable for use in log messages for identifying the
- * connection.
- *
- * The name is dynamically allocated.  The caller should free it (with free())
- * when it is no longer needed. */
-static char *
-ofconn_make_name(const struct ofproto *ofproto, const char *target)
-{
-    return xasprintf("%s<->%s", dpif_base_name(ofproto->dpif), target);
-}
-
-static void
-ofconn_set_rate_limit(struct ofconn *ofconn, int rate, int burst)
-{
-    int i;
-
-    for (i = 0; i < N_SCHEDULERS; i++) {
-        struct pinsched **s = &ofconn->schedulers[i];
-
-        if (rate > 0) {
-            if (!*s) {
-                *s = pinsched_create(rate, burst);
-            } else {
-                pinsched_set_limits(*s, rate, burst);
-            }
-        } else {
-            pinsched_destroy(*s);
-            *s = NULL;
-        }
-    }
-}
-
-static struct ofproto *
-ofconn_get_ofproto(struct ofconn *ofconn)
-{
-    return ofconn->ofproto;
-}
-
-static enum nx_flow_format
-ofconn_get_flow_format(struct ofconn *ofconn)
-{
-    return ofconn->flow_format;
-}
-
-static void
-ofconn_set_flow_format(struct ofconn *ofconn, enum nx_flow_format flow_format)
-{
-    ofconn->flow_format = flow_format;
-}
-
-static int
-ofconn_get_miss_send_len(const struct ofconn *ofconn)
-{
-    return ofconn->miss_send_len;
-}
-
-static void
-ofconn_set_miss_send_len(struct ofconn *ofconn, int miss_send_len)
-{
-    ofconn->miss_send_len = miss_send_len;
-}
-
-static enum ofconn_type
-ofconn_get_type(const struct ofconn *ofconn)
-{
-    return ofconn->type;
-}
-
-static enum nx_role
-ofconn_get_role(const struct ofconn *ofconn)
-{
-    return ofconn->role;
-}
-
-static void
-ofconn_set_role(struct ofconn *ofconn, enum nx_role role)
-{
-    ofconn->role = role;
-}
-
-static int
-ofconn_pktbuf_retrieve(struct ofconn *ofconn, uint32_t id,
-                       struct ofpbuf **bufferp, uint16_t *in_port)
-{
-    return pktbuf_retrieve(ofconn->pktbuf, id, bufferp, in_port);
-}
-\f
-static void
-ofservice_reconfigure(struct ofservice *ofservice,
-                      const struct ofproto_controller *c)
-{
-    ofservice->probe_interval = c->probe_interval;
-    ofservice->rate_limit = c->rate_limit;
-    ofservice->burst_limit = c->burst_limit;
-}
-
-/* Creates a new ofservice in 'ofproto'.  Returns 0 if successful, otherwise a
- * positive errno value. */
-static int
-ofservice_create(struct ofproto *ofproto, const struct ofproto_controller *c)
-{
-    struct ofservice *ofservice;
-    struct pvconn *pvconn;
-    int error;
-
-    error = pvconn_open(c->target, &pvconn);
-    if (error) {
-        return error;
-    }
-
-    ofservice = xzalloc(sizeof *ofservice);
-    hmap_insert(&ofproto->services, &ofservice->node,
-                hash_string(c->target, 0));
-    ofservice->pvconn = pvconn;
-
-    ofservice_reconfigure(ofservice, c);
-
-    return 0;
-}
-
-static void
-ofservice_destroy(struct ofproto *ofproto, struct ofservice *ofservice)
-{
-    hmap_remove(&ofproto->services, &ofservice->node);
-    pvconn_close(ofservice->pvconn);
-    free(ofservice);
-}
-
-/* Finds and returns the ofservice within 'ofproto' that has the given
- * 'target', or a null pointer if none exists. */
-static struct ofservice *
-ofservice_lookup(struct ofproto *ofproto, const char *target)
-{
-    struct ofservice *ofservice;
-
-    HMAP_FOR_EACH_WITH_HASH (ofservice, node, hash_string(target, 0),
-                             &ofproto->services) {
-        if (!strcmp(pvconn_get_name(ofservice->pvconn), target)) {
-            return ofservice;
-        }
-    }
-    return NULL;
-}
-\f
 /* Returns true if 'rule' should be hidden from the controller.
  *
  * Rules with priority higher than UINT16_MAX are set up by ofproto itself
@@ -2686,22 +1856,6 @@ facet_revalidate(struct ofproto *ofproto, struct facet *facet)
     return true;
 }
 \f
-static void
-queue_tx(struct ofpbuf *msg, const struct ofconn *ofconn,
-         struct rconn_packet_counter *counter)
-{
-    update_openflow_length(msg);
-    if (rconn_send(ofconn->rconn, msg, counter)) {
-        ofpbuf_delete(msg);
-    }
-}
-
-static void
-ofconn_send_reply(const struct ofconn *ofconn, struct ofpbuf *msg)
-{
-    queue_tx(msg, ofconn, ofconn->reply_counter);
-}
-
 static void
 send_error_oh(const struct ofconn *ofconn, const struct ofp_header *oh,
               int error)
@@ -3301,8 +2455,9 @@ xlate_actions(struct action_xlate_ctx *ctx,
 
     /* Check with in-band control to see if we're allowed to set up this
      * flow. */
-    if (!in_band_rule_check(ctx->ofproto->in_band, &ctx->flow,
-                            ctx->odp_actions->data, ctx->odp_actions->size)) {
+    if (!connmgr_may_set_up_flow(ctx->ofproto->connmgr, &ctx->flow,
+                                 ctx->odp_actions->data,
+                                 ctx->odp_actions->size)) {
         ctx->may_set_up_flow = false;
     }
 
@@ -4387,8 +3542,7 @@ handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
     uint32_t role;
 
     if (ofconn_get_type(ofconn) != OFCONN_PRIMARY) {
-        VLOG_WARN_RL(&rl, "ignoring role request on non-controller "
-                     "connection");
+        VLOG_WARN_RL(&rl, "ignoring role request on service connection");
         return ofp_mkerr(OFPET_BAD_REQUEST, OFPBRC_EPERM);
     }
 
@@ -4401,16 +3555,6 @@ handle_role_request(struct ofconn *ofconn, const struct ofp_header *oh)
         return ofp_mkerr(OFPET_BAD_REQUEST, -1);
     }
 
-    if (role == NX_ROLE_MASTER) {
-        struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
-        struct ofconn *other;
-
-        HMAP_FOR_EACH (other, hmap_node, &ofproto->controllers) {
-            if (ofconn_get_role(other) == NX_ROLE_MASTER) {
-                ofconn_set_role(other, NX_ROLE_SLAVE);
-            }
-        }
-    }
     ofconn_set_role(ofconn, role);
 
     reply = make_nxmsg_xid(sizeof *reply, NXT_ROLE_REPLY, oh->xid, &buf);
@@ -4601,7 +3745,7 @@ handle_miss_upcall(struct ofproto *p, struct dpif_upcall *upcall)
 
     /* Check with in-band control to see if this packet should be sent
      * to the local port regardless of the flow table. */
-    if (in_band_msg_in_hook(p->in_band, &flow, upcall->packet)) {
+    if (connmgr_msg_in_hook(p->connmgr, &flow, upcall->packet)) {
         ofproto_send_packet(p, ODPP_LOCAL, 0, upcall->packet);
     }
 
@@ -4962,7 +4106,6 @@ static void
 rule_send_removed(struct ofproto *p, struct rule *rule, uint8_t reason)
 {
     struct ofputil_flow_removed fr;
-    struct ofconn *ofconn;
 
     if (!rule->send_flow_removed) {
         return;
@@ -4976,22 +4119,7 @@ rule_send_removed(struct ofproto *p, struct rule *rule, uint8_t reason)
     fr.packet_count = rule->packet_count;
     fr.byte_count = rule->byte_count;
 
-    LIST_FOR_EACH (ofconn, node, &p->all_conns) {
-        struct ofpbuf *msg;
-
-        if (!rconn_is_connected(ofconn->rconn)
-            || !ofconn_receives_async_msgs(ofconn)) {
-            continue;
-        }
-
-        /* This accounts flow expirations as if they were replies to OpenFlow
-         * requests.  That works because preventing OpenFlow requests from
-         * being processed also prevents new flows from being added (and
-         * expiring).  (It also prevents processing OpenFlow requests that
-         * would not add new flows, so it is imperfect.) */
-        msg = ofputil_encode_flow_removed(&fr, ofconn_get_flow_format(ofconn));
-        ofconn_send_reply(ofconn, msg);
-    }
+    connmgr_send_flow_removed(p->connmgr, &fr);
 }
 
 /* Obtains statistics for 'rule' and stores them in '*packets' and '*bytes'.
@@ -5019,64 +4147,6 @@ rule_get_stats(const struct rule *rule, uint64_t *packets, uint64_t *bytes)
     *bytes = b;
 }
 
-/* pinsched callback for sending 'ofp_packet_in' on 'ofconn'. */
-static void
-do_send_packet_in(struct ofpbuf *ofp_packet_in, void *ofconn_)
-{
-    struct ofconn *ofconn = ofconn_;
-
-    rconn_send_with_limit(ofconn->rconn, ofp_packet_in,
-                          ofconn->packet_in_counter, 100);
-}
-
-/* Takes 'upcall', whose packet has the flow specified by 'flow', composes an
- * OpenFlow packet-in message from it, and passes it to 'ofconn''s packet
- * scheduler for sending.
- *
- * If 'clone' is true, the caller retains ownership of 'upcall->packet'.
- * Otherwise, ownership is transferred to this function. */
-static void
-schedule_packet_in(struct ofconn *ofconn, struct dpif_upcall *upcall,
-                   const struct flow *flow, bool clone)
-{
-    struct ofproto *ofproto = ofconn_get_ofproto(ofconn);
-    struct ofputil_packet_in pin;
-    struct ofpbuf *msg;
-
-    /* Figure out the easy parts. */
-    pin.packet = upcall->packet;
-    pin.in_port = odp_port_to_ofp_port(flow->in_port);
-    pin.reason = upcall->type == DPIF_UC_MISS ? OFPR_NO_MATCH : OFPR_ACTION;
-
-    /* Get OpenFlow buffer_id. */
-    if (upcall->type == DPIF_UC_ACTION) {
-        pin.buffer_id = UINT32_MAX;
-    } else if (ofproto->fail_open && fail_open_is_active(ofproto->fail_open)) {
-        pin.buffer_id = pktbuf_get_null();
-    } else if (!ofconn->pktbuf) {
-        pin.buffer_id = UINT32_MAX;
-    } else {
-        pin.buffer_id = pktbuf_save(ofconn->pktbuf, upcall->packet,
-                                    flow->in_port);
-    }
-
-    /* Figure out how much of the packet to send. */
-    pin.send_len = upcall->packet->size;
-    if (pin.buffer_id != UINT32_MAX) {
-        pin.send_len = MIN(pin.send_len, ofconn->miss_send_len);
-    }
-    if (upcall->type == DPIF_UC_ACTION) {
-        pin.send_len = MIN(pin.send_len, upcall->userdata);
-    }
-
-    /* Make OFPT_PACKET_IN and hand over to packet scheduler.  It might
-     * immediately call into do_send_packet_in() or it might buffer it for a
-     * while (until a later call to pinsched_run()). */
-    msg = ofputil_encode_packet_in(&pin, clone ? NULL : upcall->packet);
-    pinsched_send(ofconn->schedulers[upcall->type == DPIF_UC_MISS ? 0 : 1],
-                  flow->in_port, msg, do_send_packet_in, ofconn);
-}
-
 /* Given 'upcall', of type DPIF_UC_ACTION or DPIF_UC_MISS, sends an
  * OFPT_PACKET_IN message to each OpenFlow controller as necessary according to
  * their individual configurations.
@@ -5087,22 +4157,15 @@ static void
 send_packet_in(struct ofproto *ofproto, struct dpif_upcall *upcall,
                const struct flow *flow, bool clone)
 {
-    struct ofconn *ofconn, *prev;
+    struct ofputil_packet_in pin;
 
-    prev = NULL;
-    LIST_FOR_EACH (ofconn, node, &ofproto->all_conns) {
-        if (ofconn_receives_async_msgs(ofconn)) {
-            if (prev) {
-                schedule_packet_in(prev, upcall, flow, true);
-            }
-            prev = ofconn;
-        }
-    }
-    if (prev) {
-        schedule_packet_in(prev, upcall, flow, clone);
-    } else if (!clone) {
-        ofpbuf_delete(upcall->packet);
-    }
+    pin.packet = upcall->packet;
+    pin.in_port = odp_port_to_ofp_port(flow->in_port);
+    pin.reason = upcall->type == DPIF_UC_MISS ? OFPR_NO_MATCH : OFPR_ACTION;
+    pin.buffer_id = 0;          /* not yet known */
+    pin.send_len = upcall->userdata;
+    connmgr_send_packet_in(ofproto->connmgr, upcall, flow,
+                           clone ? NULL : upcall->packet);
 }
 
 static uint64_t