ofproto-dpif: Batch interacting with the dpif on flow miss operations.
[sliver-openvswitch.git] / ofproto / ofproto-dpif.c
index 761b591..ae76e98 100644 (file)
@@ -256,8 +256,7 @@ struct facet {
     struct netflow_flow nf_flow; /* Per-flow NetFlow tracking data. */
 };
 
-static struct facet *facet_create(struct rule_dpif *, const struct flow *,
-                                  const struct ofpbuf *packet);
+static struct facet *facet_create(struct rule_dpif *, const struct flow *);
 static void facet_remove(struct ofproto_dpif *, struct facet *);
 static void facet_free(struct facet *);
 
@@ -266,6 +265,11 @@ static struct facet *facet_lookup_valid(struct ofproto_dpif *,
                                         const struct flow *);
 static bool facet_revalidate(struct ofproto_dpif *, struct facet *);
 
+static bool execute_controller_action(struct ofproto_dpif *,
+                                      const struct flow *,
+                                      const struct nlattr *odp_actions,
+                                      size_t actions_len,
+                                      struct ofpbuf *packet);
 static void facet_execute(struct ofproto_dpif *, struct facet *,
                           struct ofpbuf *packet);
 
@@ -396,7 +400,12 @@ static void update_learning_table(struct ofproto_dpif *,
 static bool is_admissible(struct ofproto_dpif *, const struct flow *,
                           bool have_packet, tag_type *, int *vlanp,
                           struct ofbundle **in_bundlep);
+
+/* Upcalls. */
+#define FLOW_MISS_MAX_BATCH 50
 static void handle_upcall(struct ofproto_dpif *, struct dpif_upcall *);
+static void handle_miss_upcalls(struct ofproto_dpif *,
+                                struct dpif_upcall *, size_t n);
 
 /* Flow expiration. */
 static int expire(struct ofproto_dpif *);
@@ -565,8 +574,10 @@ static int
 run(struct ofproto *ofproto_)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+    struct dpif_upcall misses[FLOW_MISS_MAX_BATCH];
     struct ofport_dpif *ofport;
     struct ofbundle *bundle;
+    size_t n_misses;
     int i;
 
     if (!clogged) {
@@ -574,22 +585,29 @@ run(struct ofproto *ofproto_)
     }
     dpif_run(ofproto->dpif);
 
-    for (i = 0; i < 50; i++) {
-        struct dpif_upcall packet;
+    n_misses = 0;
+    for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {
+        struct dpif_upcall *upcall = &misses[n_misses];
         int error;
 
-        error = dpif_recv(ofproto->dpif, &packet);
+        error = dpif_recv(ofproto->dpif, upcall);
         if (error) {
-            if (error == ENODEV) {
-                /* Datapath destroyed. */
+            if (error == ENODEV && n_misses == 0) {
                 return error;
             }
             break;
         }
 
-        handle_upcall(ofproto, &packet);
+        if (upcall->type == DPIF_UC_MISS) {
+            /* Handle it later. */
+            n_misses++;
+        } else {
+            handle_upcall(ofproto, upcall);
+        }
     }
 
+    handle_miss_upcalls(ofproto, misses, n_misses);
+
     if (timer_expired(&ofproto->next_expiration)) {
         int delay = expire(ofproto);
         timer_set_duration(&ofproto->next_expiration, delay);
@@ -710,7 +728,7 @@ static void
 get_tables(struct ofproto *ofproto_, struct ofp_table_stats *ots)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
-    struct ovs_dp_stats s;
+    struct dpif_dp_stats s;
 
     strcpy(ots->name, "classifier");
 
@@ -758,6 +776,7 @@ port_construct(struct ofport *port_)
     struct ofport_dpif *port = ofport_dpif_cast(port_);
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(port->up.ofproto);
 
+    ofproto->need_revalidate = true;
     port->odp_port = ofp_port_to_odp_port(port->up.ofp_port);
     port->bundle = NULL;
     port->cfm = NULL;
@@ -778,6 +797,7 @@ port_destruct(struct ofport *port_)
     struct ofport_dpif *port = ofport_dpif_cast(port_);
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(port->up.ofproto);
 
+    ofproto->need_revalidate = true;
     bundle_remove(port_);
     set_cfm(port_, NULL);
     if (ofproto->sflow) {
@@ -988,6 +1008,7 @@ bundle_add_port(struct ofbundle *bundle, uint32_t ofp_port,
         }
     }
     if (lacp) {
+        port->bundle->ofproto->need_revalidate = true;
         lacp_slave_register(bundle->lacp, port, lacp);
     }
 
@@ -1565,7 +1586,8 @@ port_run(struct ofport_dpif *ofport)
             ofpbuf_uninit(&packet);
         }
 
-        enable = enable && !cfm_get_fault(ofport->cfm);
+        enable = enable && !cfm_get_fault(ofport->cfm)
+            && cfm_get_opup(ofport->cfm);
     }
 
     if (ofport->bundle) {
@@ -1711,28 +1733,74 @@ port_is_lacp_current(const struct ofport *ofport_)
 \f
 /* Upcall handling. */
 
-/* Given 'upcall', of type DPIF_UC_ACTION or DPIF_UC_MISS, sends an
- * OFPT_PACKET_IN message to each OpenFlow controller as necessary according to
- * their individual configurations.
+/* Flow miss batching.
+ *
+ * Some dpifs implement operations faster when you hand them off in a batch.
+ * To allow batching, "struct flow_miss" queues the dpif-related work needed
+ * for a given flow.  Each "struct flow_miss" corresponds to sending one or
+ * more packets, plus possibly installing the flow in the dpif.
+ *
+ * So far we only batch the operations that affect flow setup time the most.
+ * It's possible to batch more than that, but the benefit might be minimal. */
+struct flow_miss {
+    struct hmap_node hmap_node;
+    struct flow flow;
+    const struct nlattr *key;
+    size_t key_len;
+    struct list packets;
+};
+
+struct flow_miss_op {
+    union dpif_op dpif_op;
+    struct facet *facet;
+};
+
+/* Sends an OFPT_PACKET_IN message for 'packet' of type OFPR_NO_MATCH to each
+ * OpenFlow controller as necessary according to their individual
+ * configurations.
+ *
+ * If 'clone' is true, the caller retains ownership of 'packet'.  Otherwise,
+ * ownership is transferred to this function. */
+static void
+send_packet_in_miss(struct ofproto_dpif *ofproto, struct ofpbuf *packet,
+                    const struct flow *flow, bool clone)
+{
+    struct ofputil_packet_in pin;
+
+    pin.packet = packet;
+    pin.in_port = flow->in_port;
+    pin.reason = OFPR_NO_MATCH;
+    pin.buffer_id = 0;          /* not yet known */
+    pin.send_len = 0;           /* not used for flow table misses */
+    connmgr_send_packet_in(ofproto->up.connmgr, &pin, flow,
+                           clone ? NULL : packet);
+}
+
+/* Sends an OFPT_PACKET_IN message for 'packet' of type OFPR_ACTION to each
+ * OpenFlow controller as necessary according to their individual
+ * configurations.
+ *
+ * 'send_len' should be the number of bytes of 'packet' to send to the
+ * controller, as specified in the action that caused the packet to be sent.
  *
  * If 'clone' is true, the caller retains ownership of 'upcall->packet'.
  * Otherwise, ownership is transferred to this function. */
 static void
-send_packet_in(struct ofproto_dpif *ofproto, struct dpif_upcall *upcall,
-               const struct flow *flow, bool clone)
+send_packet_in_action(struct ofproto_dpif *ofproto, struct ofpbuf *packet,
+                      uint64_t userdata, const struct flow *flow, bool clone)
 {
     struct ofputil_packet_in pin;
     struct user_action_cookie cookie;
 
-    pin.packet = upcall->packet;
+    memcpy(&cookie, &userdata, sizeof(cookie));
+
+    pin.packet = packet;
     pin.in_port = flow->in_port;
-    pin.reason = upcall->type == DPIF_UC_MISS ? OFPR_NO_MATCH : OFPR_ACTION;
+    pin.reason = OFPR_ACTION;
     pin.buffer_id = 0;          /* not yet known */
-
-    memcpy(&cookie, &upcall->userdata, sizeof(cookie));
     pin.send_len = cookie.data;
     connmgr_send_packet_in(ofproto->up.connmgr, &pin, flow,
-                           clone ? NULL : upcall->packet);
+                           clone ? NULL : packet);
 }
 
 static bool
@@ -1760,77 +1828,206 @@ process_special(struct ofproto_dpif *ofproto, const struct flow *flow,
     return false;
 }
 
-static void
-handle_miss_upcall(struct ofproto_dpif *ofproto, struct dpif_upcall *upcall)
+static struct flow_miss *
+flow_miss_create(struct hmap *todo, const struct flow *flow,
+                 const struct nlattr *key, size_t key_len)
 {
-    struct facet *facet;
-    struct flow flow;
-
-    /* Obtain in_port and tun_id, at least. */
-    odp_flow_key_to_flow(upcall->key, upcall->key_len, &flow);
+    uint32_t hash = flow_hash(flow, 0);
+    struct flow_miss *miss;
 
-    /* Set header pointers in 'flow'. */
-    flow_extract(upcall->packet, flow.tun_id, flow.in_port, &flow);
-
-    /* Handle 802.1ag and LACP. */
-    if (process_special(ofproto, &flow, upcall->packet)) {
-        ofpbuf_delete(upcall->packet);
-        ofproto->n_matches++;
-        return;
+    HMAP_FOR_EACH_WITH_HASH (miss, hmap_node, hash, todo) {
+        if (flow_equal(&miss->flow, flow)) {
+            return miss;
+        }
     }
 
-    /* Check with in-band control to see if this packet should be sent
-     * to the local port regardless of the flow table. */
-    if (connmgr_msg_in_hook(ofproto->up.connmgr, &flow, upcall->packet)) {
-        send_packet(ofproto, OVSP_LOCAL, upcall->packet);
-    }
+    miss = xmalloc(sizeof *miss);
+    hmap_insert(todo, &miss->hmap_node, hash);
+    miss->flow = *flow;
+    miss->key = key;
+    miss->key_len = key_len;
+    list_init(&miss->packets);
+    return miss;
+}
+
+static void
+handle_flow_miss(struct ofproto_dpif *ofproto, struct flow_miss *miss,
+                 struct flow_miss_op *ops, size_t *n_ops)
+{
+    const struct flow *flow = &miss->flow;
+    struct ofpbuf *packet, *next_packet;
+    struct facet *facet;
 
-    facet = facet_lookup_valid(ofproto, &flow);
+    facet = facet_lookup_valid(ofproto, flow);
     if (!facet) {
-        struct rule_dpif *rule = rule_dpif_lookup(ofproto, &flow, 0);
+        struct rule_dpif *rule;
+
+        rule = rule_dpif_lookup(ofproto, flow, 0);
         if (!rule) {
             /* Don't send a packet-in if OFPPC_NO_PACKET_IN asserted. */
-            struct ofport_dpif *port = get_ofp_port(ofproto, flow.in_port);
+            struct ofport_dpif *port = get_ofp_port(ofproto, flow->in_port);
             if (port) {
                 if (port->up.opp.config & htonl(OFPPC_NO_PACKET_IN)) {
                     COVERAGE_INC(ofproto_dpif_no_packet_in);
                     /* XXX install 'drop' flow entry */
-                    ofpbuf_delete(upcall->packet);
                     return;
                 }
             } else {
                 VLOG_WARN_RL(&rl, "packet-in on unknown port %"PRIu16,
-                             flow.in_port);
+                             flow->in_port);
+            }
+
+            LIST_FOR_EACH_SAFE (packet, next_packet, list_node,
+                                &miss->packets) {
+                list_remove(&packet->list_node);
+                send_packet_in_miss(ofproto, packet, flow, false);
             }
 
-            send_packet_in(ofproto, upcall, &flow, false);
             return;
         }
 
-        facet = facet_create(rule, &flow, upcall->packet);
-    } else if (!facet->may_install) {
-        /* The facet is not installable, that is, we need to process every
-         * packet, so process the current packet's actions into 'facet'. */
-        facet_make_actions(ofproto, facet, upcall->packet);
+        facet = facet_create(rule, flow);
     }
 
-    if (facet->rule->up.cr.priority == FAIL_OPEN_PRIORITY) {
-        /*
-         * Extra-special case for fail-open mode.
-         *
-         * We are in fail-open mode and the packet matched the fail-open rule,
-         * but we are connected to a controller too.  We should send the packet
-         * up to the controller in the hope that it will try to set up a flow
-         * and thereby allow us to exit fail-open.
-         *
-         * See the top-level comment in fail-open.c for more information.
-         */
-        send_packet_in(ofproto, upcall, &flow, true);
+    LIST_FOR_EACH_SAFE (packet, next_packet, list_node, &miss->packets) {
+        list_remove(&packet->list_node);
+        ofproto->n_matches++;
+
+        if (facet->rule->up.cr.priority == FAIL_OPEN_PRIORITY) {
+            /*
+             * Extra-special case for fail-open mode.
+             *
+             * We are in fail-open mode and the packet matched the fail-open
+             * rule, but we are connected to a controller too.  We should send
+             * the packet up to the controller in the hope that it will try to
+             * set up a flow and thereby allow us to exit fail-open.
+             *
+             * See the top-level comment in fail-open.c for more information.
+             */
+            send_packet_in_miss(ofproto, packet, flow, true);
+        }
+
+        if (!facet->may_install) {
+            facet_make_actions(ofproto, facet, packet);
+        }
+        if (!execute_controller_action(ofproto, &facet->flow,
+                                       facet->actions, facet->actions_len,
+                                       packet)) {
+            struct flow_miss_op *op = &ops[(*n_ops)++];
+            struct dpif_execute *execute = &op->dpif_op.execute;
+
+            op->facet = facet;
+            execute->type = DPIF_OP_EXECUTE;
+            execute->key = miss->key;
+            execute->key_len = miss->key_len;
+            execute->actions
+                = (facet->may_install
+                   ? facet->actions
+                   : xmemdup(facet->actions, facet->actions_len));
+            execute->actions_len = facet->actions_len;
+            execute->packet = packet;
+        }
+    }
+
+    if (facet->may_install) {
+        struct flow_miss_op *op = &ops[(*n_ops)++];
+        struct dpif_flow_put *put = &op->dpif_op.flow_put;
+
+        op->facet = facet;
+        put->type = DPIF_OP_FLOW_PUT;
+        put->flags = DPIF_FP_CREATE | DPIF_FP_MODIFY;
+        put->key = miss->key;
+        put->key_len = miss->key_len;
+        put->actions = facet->actions;
+        put->actions_len = facet->actions_len;
+        put->stats = NULL;
+    }
+}
+
+static void
+handle_miss_upcalls(struct ofproto_dpif *ofproto, struct dpif_upcall *upcalls,
+                    size_t n_upcalls)
+{
+    struct dpif_upcall *upcall;
+    struct flow_miss *miss, *next_miss;
+    struct flow_miss_op flow_miss_ops[FLOW_MISS_MAX_BATCH * 2];
+    union dpif_op *dpif_ops[FLOW_MISS_MAX_BATCH * 2];
+    struct hmap todo;
+    size_t n_ops;
+    size_t i;
+
+    if (!n_upcalls) {
+        return;
+    }
+
+    /* Construct the to-do list.
+     *
+     * This just amounts to extracting the flow from each packet and sticking
+     * the packets that have the same flow in the same "flow_miss" structure so
+     * that we can process them together. */
+    hmap_init(&todo);
+    for (upcall = upcalls; upcall < &upcalls[n_upcalls]; upcall++) {
+        struct flow_miss *miss;
+        struct flow flow;
+
+        /* Obtain in_port and tun_id, at least, then set 'flow''s header
+         * pointers. */
+        odp_flow_key_to_flow(upcall->key, upcall->key_len, &flow);
+        flow_extract(upcall->packet, flow.tun_id, flow.in_port, &flow);
+
+        /* Handle 802.1ag and LACP specially. */
+        if (process_special(ofproto, &flow, upcall->packet)) {
+            ofpbuf_delete(upcall->packet);
+            ofproto->n_matches++;
+            continue;
+        }
+
+        /* Add other packets to a to-do list. */
+        miss = flow_miss_create(&todo, &flow, upcall->key, upcall->key_len);
+        list_push_back(&miss->packets, &upcall->packet->list_node);
+    }
+
+    /* Process each element in the to-do list, constructing the set of
+     * operations to batch. */
+    n_ops = 0;
+    HMAP_FOR_EACH_SAFE (miss, next_miss, hmap_node, &todo) {
+        handle_flow_miss(ofproto, miss, flow_miss_ops, &n_ops);
+        ofpbuf_list_delete(&miss->packets);
+        hmap_remove(&todo, &miss->hmap_node);
+        free(miss);
+    }
+    assert(n_ops <= ARRAY_SIZE(flow_miss_ops));
+    hmap_destroy(&todo);
+
+    /* Execute batch. */
+    for (i = 0; i < n_ops; i++) {
+        dpif_ops[i] = &flow_miss_ops[i].dpif_op;
     }
+    dpif_operate(ofproto->dpif, dpif_ops, n_ops);
+
+    /* Free memory and update facets. */
+    for (i = 0; i < n_ops; i++) {
+        struct flow_miss_op *op = &flow_miss_ops[i];
+        struct dpif_execute *execute;
+        struct dpif_flow_put *put;
 
-    facet_execute(ofproto, facet, upcall->packet);
-    facet_install(ofproto, facet, false);
-    ofproto->n_matches++;
+        switch (op->dpif_op.type) {
+        case DPIF_OP_EXECUTE:
+            execute = &op->dpif_op.execute;
+            if (op->facet->actions != execute->actions) {
+                free((struct nlattr *) execute->actions);
+            }
+            ofpbuf_delete((struct ofpbuf *) execute->packet);
+            break;
+
+        case DPIF_OP_FLOW_PUT:
+            put = &op->dpif_op.flow_put;
+            if (!put->error) {
+                op->facet->installed = true;
+            }
+            break;
+        }
+    }
 }
 
 static void
@@ -1852,7 +2049,8 @@ handle_userspace_upcall(struct ofproto_dpif *ofproto,
     } else if (cookie.type == USER_ACTION_COOKIE_CONTROLLER) {
         COVERAGE_INC(ofproto_dpif_ctlr_action);
         odp_flow_key_to_flow(upcall->key, upcall->key_len, &flow);
-        send_packet_in(ofproto, upcall, &flow, false);
+        send_packet_in_action(ofproto, upcall->packet, upcall->userdata,
+                              &flow, false);
     } else {
         VLOG_WARN_RL(&rl, "invalid user cookie : 0x%"PRIx64, upcall->userdata);
     }
@@ -1867,8 +2065,8 @@ handle_upcall(struct ofproto_dpif *ofproto, struct dpif_upcall *upcall)
         break;
 
     case DPIF_UC_MISS:
-        handle_miss_upcall(ofproto, upcall);
-        break;
+        /* The caller handles these. */
+        NOT_REACHED();
 
     case DPIF_N_UC_TYPES:
     default:
@@ -2156,15 +2354,16 @@ rule_expire(struct rule_dpif *rule)
 \f
 /* Facets. */
 
-/* Creates and returns a new facet owned by 'rule', given a 'flow' and an
- * example 'packet' within that flow.
+/* Creates and returns a new facet owned by 'rule', given a 'flow'.
  *
  * The caller must already have determined that no facet with an identical
  * 'flow' exists in 'ofproto' and that 'flow' is the best match for 'rule' in
- * the ofproto's classifier table. */
+ * the ofproto's classifier table.
+ *
+ * The facet will initially have no ODP actions.  The caller should fix that
+ * by calling facet_make_actions(). */
 static struct facet *
-facet_create(struct rule_dpif *rule, const struct flow *flow,
-             const struct ofpbuf *packet)
+facet_create(struct rule_dpif *rule, const struct flow *flow)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(rule->up.ofproto);
     struct facet *facet;
@@ -2178,8 +2377,6 @@ facet_create(struct rule_dpif *rule, const struct flow *flow,
     netflow_flow_init(&facet->nf_flow);
     netflow_flow_update_time(ofproto->netflow, &facet->nf_flow, facet->used);
 
-    facet_make_actions(ofproto, facet, packet);
-
     return facet;
 }
 
@@ -2190,6 +2387,33 @@ facet_free(struct facet *facet)
     free(facet);
 }
 
+static bool
+execute_controller_action(struct ofproto_dpif *ofproto,
+                          const struct flow *flow,
+                          const struct nlattr *odp_actions, size_t actions_len,
+                          struct ofpbuf *packet)
+{
+    if (actions_len
+        && odp_actions->nla_type == OVS_ACTION_ATTR_USERSPACE
+        && NLA_ALIGN(odp_actions->nla_len) == actions_len) {
+        /* As an optimization, avoid a round-trip from userspace to kernel to
+         * userspace.  This also avoids possibly filling up kernel packet
+         * buffers along the way.
+         *
+         * This optimization will not accidentally catch sFlow
+         * OVS_ACTION_ATTR_USERSPACE actions, since those are encapsulated
+         * inside OVS_ACTION_ATTR_SAMPLE. */
+        const struct nlattr *nla;
+
+        nla = nl_attr_find_nested(odp_actions, OVS_USERSPACE_ATTR_USERDATA);
+        send_packet_in_action(ofproto, packet, nl_attr_get_u64(nla), flow,
+                              false);
+        return true;
+    } else {
+        return false;
+    }
+}
+
 /* Executes, within 'ofproto', the 'n_actions' actions in 'actions' on
  * 'packet', which arrived on 'in_port'.
  *
@@ -2203,29 +2427,9 @@ execute_odp_actions(struct ofproto_dpif *ofproto, const struct flow *flow,
     struct ofpbuf key;
     int error;
 
-    if (actions_len == NLA_ALIGN(NLA_HDRLEN + sizeof(uint64_t))
-        && odp_actions->nla_type == OVS_ACTION_ATTR_USERSPACE) {
-        const struct user_action_cookie *cookie;
-        struct dpif_upcall upcall;
-
-        cookie = nl_attr_get_unspec(odp_actions, sizeof(*cookie));
-        if (cookie->type == USER_ACTION_COOKIE_CONTROLLER) {
-            /* As an optimization, avoid a round-trip from userspace to kernel
-             * to userspace.  This also avoids possibly filling up kernel packet
-             * buffers along the way.
-             * This optimization does not work in case of sFlow is turned ON.
-             * Since first action would be sFlow SAMPLE action followed by
-             * Controller action. */
-
-            upcall.type = DPIF_UC_ACTION;
-            upcall.packet = packet;
-            upcall.key = NULL;
-            upcall.key_len = 0;
-            upcall.userdata = nl_attr_get_u64(odp_actions);
-
-            send_packet_in(ofproto, &upcall, flow, false);
-            return true;
-        }
+    if (execute_controller_action(ofproto, flow, odp_actions, actions_len,
+                                  packet)) {
+        return true;
     }
 
     ofpbuf_use_stack(&key, &keybuf, sizeof keybuf);
@@ -2537,7 +2741,8 @@ facet_lookup_valid(struct ofproto_dpif *ofproto, const struct flow *flow)
     /* The facet we found might not be valid, since we could be in need of
      * revalidation.  If it is not valid, don't return it. */
     if (facet
-        && ofproto->need_revalidate
+        && (ofproto->need_revalidate
+            || tag_set_intersects(&ofproto->revalidate_set, facet->tags))
         && !facet_revalidate(ofproto, facet)) {
         COVERAGE_INC(facet_invalidated);
         return NULL;
@@ -2876,6 +3081,9 @@ rule_execute(struct rule *rule_, struct flow *flow, struct ofpbuf *packet)
     /* First look for a related facet.  If we find one, account it to that. */
     facet = facet_lookup_valid(ofproto, flow);
     if (facet && facet->rule == rule) {
+        if (!facet->may_install) {
+            facet_make_actions(ofproto, facet, packet);
+        }
         facet_execute(ofproto, facet, packet);
         return 0;
     }
@@ -2883,7 +3091,8 @@ rule_execute(struct rule *rule_, struct flow *flow, struct ofpbuf *packet)
     /* Otherwise, if 'rule' is in fact the correct rule for 'packet', then
      * create a new facet for it and use that. */
     if (rule_dpif_lookup(ofproto, flow, 0) == rule) {
-        facet = facet_create(rule, flow, packet);
+        facet = facet_create(rule, flow);
+        facet_make_actions(ofproto, facet, packet);
         facet_execute(ofproto, facet, packet);
         facet_install(ofproto, facet, true);
         return 0;
@@ -2923,7 +3132,7 @@ rule_modify_actions(struct rule *rule_)
     complete_operation(rule);
 }
 \f
-/* Sends 'packet' out of port 'odp_port' within 'p'.
+/* Sends 'packet' out of port 'odp_port' within 'ofproto'.
  * Returns 0 if successful, otherwise a positive errno value. */
 static int
 send_packet(struct ofproto_dpif *ofproto, uint32_t odp_port,
@@ -2961,6 +3170,27 @@ static void do_xlate_actions(const union ofp_action *in, size_t n_in,
                              struct action_xlate_ctx *ctx);
 static void xlate_normal(struct action_xlate_ctx *);
 
+static size_t
+put_userspace_action(const struct ofproto_dpif *ofproto,
+                     struct ofpbuf *odp_actions,
+                     const struct flow *flow,
+                     const struct user_action_cookie *cookie)
+{
+    size_t offset;
+    uint32_t pid;
+
+    pid = dpif_port_get_pid(ofproto->dpif,
+                            ofp_port_to_odp_port(flow->in_port));
+
+    offset = nl_msg_start_nested(odp_actions, OVS_ACTION_ATTR_USERSPACE);
+    nl_msg_put_u32(odp_actions, OVS_USERSPACE_ATTR_PID, pid);
+    nl_msg_put_unspec(odp_actions, OVS_USERSPACE_ATTR_USERDATA,
+                      cookie, sizeof *cookie);
+    nl_msg_end_nested(odp_actions, offset);
+
+    return odp_actions->size - NLA_ALIGN(sizeof *cookie);
+}
+
 /* Compose SAMPLE action for sFlow. */
 static size_t
 compose_sflow_action(const struct ofproto_dpif *ofproto,
@@ -2970,9 +3200,9 @@ compose_sflow_action(const struct ofproto_dpif *ofproto,
 {
     uint32_t port_ifindex;
     uint32_t probability;
-    struct user_action_cookie *cookie;
+    struct user_action_cookie cookie;
     size_t sample_offset, actions_offset;
-    int user_cookie_offset, n_output;
+    int cookie_offset, n_output;
 
     if (!ofproto->sflow || flow->in_port == OFPP_NONE) {
         return 0;
@@ -2994,17 +3224,15 @@ compose_sflow_action(const struct ofproto_dpif *ofproto,
 
     actions_offset = nl_msg_start_nested(odp_actions, OVS_SAMPLE_ATTR_ACTIONS);
 
-    cookie = nl_msg_put_unspec_uninit(odp_actions, OVS_ACTION_ATTR_USERSPACE,
-                                                sizeof(*cookie));
-    cookie->type = USER_ACTION_COOKIE_SFLOW;
-    cookie->data = port_ifindex;
-    cookie->n_output = n_output;
-    cookie->vlan_tci = 0;
-    user_cookie_offset = (char *) cookie - (char *) odp_actions->data;
+    cookie.type = USER_ACTION_COOKIE_SFLOW;
+    cookie.data = port_ifindex;
+    cookie.n_output = n_output;
+    cookie.vlan_tci = 0;
+    cookie_offset = put_userspace_action(ofproto, odp_actions, flow, &cookie);
 
     nl_msg_end_nested(odp_actions, actions_offset);
     nl_msg_end_nested(odp_actions, sample_offset);
-    return user_cookie_offset;
+    return cookie_offset;
 }
 
 /* SAMPLE action must be first action in any given list of actions.
@@ -3249,7 +3477,7 @@ flood_packets(struct action_xlate_ctx *ctx, ovs_be32 mask)
 }
 
 static void
-compose_controller_action(struct ofpbuf *odp_actions, int len)
+compose_controller_action(struct action_xlate_ctx *ctx, int len)
 {
     struct user_action_cookie cookie;
 
@@ -3257,9 +3485,7 @@ compose_controller_action(struct ofpbuf *odp_actions, int len)
     cookie.data = len;
     cookie.n_output = 0;
     cookie.vlan_tci = 0;
-
-    nl_msg_put_unspec(odp_actions, OVS_ACTION_ATTR_USERSPACE,
-                                       &cookie, sizeof(cookie));
+    put_userspace_action(ctx->ofproto, ctx->odp_actions, &ctx->flow, &cookie);
 }
 
 static void
@@ -3288,7 +3514,7 @@ xlate_output_action__(struct action_xlate_ctx *ctx,
         break;
     case OFPP_CONTROLLER:
         commit_odp_actions(ctx);
-        compose_controller_action(ctx->odp_actions, max_len);
+        compose_controller_action(ctx, max_len);
         break;
     case OFPP_LOCAL:
         add_output_action(ctx, OFPP_LOCAL);
@@ -3641,6 +3867,7 @@ xlate_actions(struct action_xlate_ctx *ctx,
     COVERAGE_INC(ofproto_dpif_xlate);
 
     ctx->odp_actions = ofpbuf_new(512);
+    ofpbuf_reserve(ctx->odp_actions, NL_A_U32_SIZE);
     ctx->tags = 0;
     ctx->may_set_up_flow = true;
     ctx->has_learn = false;
@@ -3655,18 +3882,23 @@ xlate_actions(struct action_xlate_ctx *ctx,
 
     if (process_special(ctx->ofproto, &ctx->flow, ctx->packet)) {
         ctx->may_set_up_flow = false;
+        return ctx->odp_actions;
     } else {
         add_sflow_action(ctx);
         do_xlate_actions(in, n_in, ctx);
         fix_sflow_action(ctx);
-    }
 
-    /* Check with in-band control to see if we're allowed to set up this
-     * flow. */
-    if (!connmgr_may_set_up_flow(ctx->ofproto->up.connmgr, &ctx->flow,
-                                 ctx->odp_actions->data,
-                                 ctx->odp_actions->size)) {
-        ctx->may_set_up_flow = false;
+        if (!connmgr_may_set_up_flow(ctx->ofproto->up.connmgr, &ctx->flow,
+                                     ctx->odp_actions->data,
+                                     ctx->odp_actions->size)) {
+            ctx->may_set_up_flow = false;
+            if (ctx->packet
+                && connmgr_msg_in_hook(ctx->ofproto->up.connmgr, &ctx->flow,
+                                       ctx->packet)) {
+                nl_msg_push_u32(ctx->odp_actions, OVS_ACTION_ATTR_OUTPUT,
+                                OVSP_LOCAL);
+            }
+        }
     }
 
     return ctx->odp_actions;