Merge "master" into "wdp".
[sliver-openvswitch.git] / ofproto / wdp-xflow.c
index 4a78e16..26830e6 100644 (file)
@@ -34,8 +34,8 @@
 #include "packets.h"
 #include "poll-loop.h"
 #include "port-array.h"
+#include "queue.h"
 #include "shash.h"
-#include "stp.h"
 #include "svec.h"
 #include "timeval.h"
 #include "util.h"
@@ -59,16 +59,22 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 #define WX_MAX_WILD     65536   /* Wildcarded rules. */
 #define WX_MAX_EXACT    1048576 /* Exact-match rules. */
 
+struct wx_port {
+    struct hmap_node hmap_node;
+    struct wdp_port wdp_port;
+    uint16_t xflow_port;
+};
+
 struct wx {
     struct list list_node;
     struct wdp wdp;
     struct xfif *xfif;
     struct classifier cls;
     struct netdev_monitor *netdev_monitor;
-    struct port_array ports;    /* Index is xflow port nr;
-                                 * wdp_port->opp.port_no is OFP port nr. */
+    struct hmap ports;          /* Contains "struct wx_port"s. */
     struct shash port_by_name;
     long long int next_expiration;
+    int wdp_listen_mask;
 
     /* Rules that might need to be revalidated. */
     bool need_revalidate;      /* Revalidate all subrules? */
@@ -81,6 +87,12 @@ struct wx {
 
     /* Used by default ofhooks. */
     struct mac_learning *ml;
+
+    /* List of "struct wdp_packets" queued for the controller by
+     * execute_xflow_actions(). */
+#define MAX_CTL_PACKETS 50
+    struct list ctl_packets;
+    int n_ctl_packets;
 };
 
 static const struct ofhooks default_ofhooks;
@@ -88,10 +100,13 @@ static const struct ofhooks default_ofhooks;
 static struct list all_wx = LIST_INITIALIZER(&all_wx);
 
 static int wx_port_init(struct wx *);
+static struct wx_port *wx_port_get(const struct wx *, uint16_t xflow_port);
 static void wx_port_process_change(struct wx *wx, int error, char *devname,
                                    wdp_port_poll_cb_func *cb, void *aux);
 static void wx_port_refresh_groups(struct wx *);
 
+static void wx_purge_ctl_packets__(struct wx *);
+
 enum {
     WX_GROUP_FLOOD = 0,
     WX_GROUP_ALL = 1
@@ -231,8 +246,7 @@ wx_rule_update_stats(struct wx *wx, struct wx_rule *rule,
         wx_rule_update_time(wx, rule, stats);
         rule->packet_count += stats->n_packets;
         rule->byte_count += stats->n_bytes;
-        /* XXX netflow_flow_update_flags(&rule->nf_flow, stats->ip_tos,
-           stats->tcp_flags); */
+        /* XXX netflow_flow_update_flags(&rule->nf_flow, stats->tcp_flags); */
     }
 }
 
@@ -326,7 +340,7 @@ wx_rule_destroy(struct wx *wx, struct wx_rule *rule)
 {
     if (!rule->super) {
         struct wx_rule *subrule, *next;
-        LIST_FOR_EACH_SAFE (subrule, next, struct wx_rule, list, &rule->list) {
+        LIST_FOR_EACH_SAFE (subrule, next, list, &rule->list) {
             wx_rule_revalidate(wx, subrule);
         }
     } else {
@@ -383,8 +397,48 @@ wx_rule_create(struct wx_rule *super,
     return rule;
 }
 
+/* Executes, within 'wx', the 'n_actions' actions in 'actions' on 'packet',
+ * which arrived on 'in_port'.
+ *
+ * Takes ownership of 'packet'. */
+static bool
+execute_xflow_actions(struct wx *wx, uint16_t in_port,
+                      const union xflow_action *actions, size_t n_actions,
+                      struct ofpbuf *packet)
+{
+    if (n_actions == 1 && actions[0].type == XFLOWAT_CONTROLLER
+        && wx->n_ctl_packets < MAX_CTL_PACKETS) {
+        /* As an optimization, avoid a round-trip from userspace to kernel to
+         * userspace.  This also avoids possibly filling up kernel packet
+         * buffers along the way. */
+        struct wdp_packet *wdp_packet;
+
+        if (!(wx->wdp_listen_mask & WDP_CHAN_ACTION)) {
+            return true;
+        }
+
+        wdp_packet = xmalloc(sizeof *wdp_packet);
+        wdp_packet->channel = WDP_CHAN_ACTION;
+        wdp_packet->tun_id = 0;
+        wdp_packet->in_port = in_port;
+        wdp_packet->send_len = actions[0].controller.arg;
+        wdp_packet->payload = packet;
+
+        list_push_back(&wx->ctl_packets, &wdp_packet->list);
+
+        return true;
+    } else {
+        int error;
+
+        error = xfif_execute(wx->xfif, in_port, actions, n_actions, packet);
+        ofpbuf_delete(packet);
+        return !error;
+    }
+}
+
 /* Executes the actions indicated by 'rule' on 'packet', which is in flow
- * 'flow' and is considered to have arrived on xflow port 'in_port'.
+ * 'flow' and is considered to have arrived on xflow port 'in_port'.  'packet'
+ * must have at least sizeof(struct ofp_packet_in) bytes of headroom.
  *
  * The flow that 'packet' actually contains does not need to actually match
  * 'rule'; the actions in 'rule' will be applied to it either way.  Likewise,
@@ -396,15 +450,20 @@ wx_rule_create(struct wx_rule *super,
  * 'packet' using rule_make_actions().  If 'rule' is a wildcard rule, or if
  * 'rule' is an exact-match rule but 'flow' is not the rule's flow, then this
  * function will compose a set of xflow actions based on 'rule''s OpenFlow
- * actions and apply them to 'packet'. */
+ * actions and apply them to 'packet'.
+ *
+ * Takes ownership of 'packet'. */
 static void
 wx_rule_execute(struct wx *wx, struct wx_rule *rule,
                 struct ofpbuf *packet, const flow_t *flow)
 {
     const union xflow_action *actions;
+    struct xflow_flow_stats stats;
     size_t n_actions;
     struct xflow_actions a;
 
+    assert(ofpbuf_headroom(packet) >= sizeof(struct ofp_packet_in));
+
     /* Grab or compose the xflow actions.
      *
      * The special case for an exact-match 'rule' where 'flow' is not the
@@ -412,11 +471,12 @@ wx_rule_execute(struct wx *wx, struct wx_rule *rule,
      * port simply because the xflow actions were composed for the wrong
      * scenario. */
     if (rule->wr.cr.flow.wildcards
-        || !flow_equal(flow, &rule->wr.cr.flow))
+        || !flow_equal_headers(flow, &rule->wr.cr.flow))
     {
         struct wx_rule *super = rule->super ? rule->super : rule;
         if (wx_xlate_actions(wx, super->wr.actions, super->wr.n_actions, flow,
                              packet, NULL, &a, NULL)) {
+            ofpbuf_delete(packet);
             return;
         }
         actions = a.actions;
@@ -427,16 +487,21 @@ wx_rule_execute(struct wx *wx, struct wx_rule *rule,
     }
 
     /* Execute the xflow actions. */
-    if (!xfif_execute(wx->xfif, flow->in_port,
-                      actions, n_actions, packet)) {
-        struct xflow_flow_stats stats;
-        flow_extract_stats(flow, packet, &stats);
+    flow_extract_stats(flow, packet, &stats);
+    if (!execute_xflow_actions(wx, flow->in_port,
+                               actions, n_actions, packet)) {
         wx_rule_update_stats(wx, rule, &stats);
         rule->used = time_msec();
         //XXX netflow_flow_update_time(wx->netflow, &rule->nf_flow, rule->used);
     }
 }
 
+/* Inserts 'rule' into 'p''s flow table.
+ *
+ * If 'packet' is nonnull, takes ownership of 'packet', executes 'rule''s
+ * actions on it and credits the statistics for sending the packet to 'rule'.
+ * 'packet' must have at least sizeof(struct ofp_packet_in) bytes of
+ * headroom. */
 static void
 wx_rule_insert(struct wx *wx, struct wx_rule *rule, struct ofpbuf *packet,
                uint16_t in_port)
@@ -641,10 +706,10 @@ static void do_xlate_actions(const union ofp_action *in, size_t n_in,
 static void
 add_output_action(struct wx_xlate_ctx *ctx, uint16_t port)
 {
-    const struct wdp_port *wdp_port = port_array_get(&ctx->wx->ports, port);
+    const struct wx_port *wx_port = wx_port_get(ctx->wx, port);
 
-    if (wdp_port) {
-        if (wdp_port->opp.config & OFPPC_NO_FWD) {
+    if (wx_port) {
+        if (wx_port->wdp_port.opp.config & OFPPC_NO_FWD) {
             /* Forwarding disabled on port. */
             return;
         }
@@ -821,12 +886,33 @@ xlate_enqueue_action(struct wx_xlate_ctx *ctx,
     }
 }
 
+static void
+xlate_set_queue_action(struct wx_xlate_ctx *ctx,
+                       const struct nx_action_set_queue *nasq)
+{
+    uint32_t priority;
+    int error;
+
+    error = xfif_queue_to_priority(ctx->wx->xfif, ntohl(nasq->queue_id),
+                                   &priority);
+    if (error) {
+        /* Couldn't translate queue to a priority, so ignore.  A warning
+         * has already been logged. */
+        return;
+    }
+
+    remove_pop_action(ctx);
+    xflow_actions_add(ctx->out, XFLOWAT_SET_PRIORITY)->priority.priority
+        = priority;
+}
+
 static void
 xlate_nicira_action(struct wx_xlate_ctx *ctx,
                     const struct nx_action_header *nah)
 {
     const struct nx_action_resubmit *nar;
     const struct nx_action_set_tunnel *nast;
+    const struct nx_action_set_queue *nasq;
     union xflow_action *oa;
     int subtype = ntohs(nah->subtype);
 
@@ -843,6 +929,21 @@ xlate_nicira_action(struct wx_xlate_ctx *ctx,
         ctx->flow.tun_id = oa->tunnel.tun_id = nast->tun_id;
         break;
 
+    case NXAST_DROP_SPOOFED_ARP:
+        if (ctx->flow.dl_type == htons(ETH_TYPE_ARP)) {
+            xflow_actions_add(ctx->out, XFLOWAT_DROP_SPOOFED_ARP);
+        }
+        break;
+
+    case NXAST_SET_QUEUE:
+        nasq = (const struct nx_action_set_queue *) nah;
+        xlate_set_queue_action(ctx, nasq);
+        break;
+
+    case NXAST_POP_QUEUE:
+        xflow_actions_add(ctx->out, XFLOWAT_POP_PRIORITY);
+        break;
+
     /* If you add a new action here that modifies flow data, don't forget to
      * update the flow key in ctx->flow at the same time. */
 
@@ -858,14 +959,17 @@ do_xlate_actions(const union ofp_action *in, size_t n_in,
 {
     struct actions_iterator iter;
     const union ofp_action *ia;
-    const struct wdp_port *port;
+    const struct wx_port *port;
 
-    port = port_array_get(&ctx->wx->ports, ctx->flow.in_port);
-    if (port && port->opp.config & (OFPPC_NO_RECV | OFPPC_NO_RECV_STP) &&
-        port->opp.config & (eth_addr_equals(ctx->flow.dl_dst, stp_eth_addr)
-                            ? OFPPC_NO_RECV_STP : OFPPC_NO_RECV)) {
-        /* Drop this flow. */
-        return;
+    port = wx_port_get(ctx->wx, ctx->flow.in_port);
+    if (port) {
+        const struct ofp_phy_port *opp = &port->wdp_port.opp;
+        if (opp->config & (OFPPC_NO_RECV | OFPPC_NO_RECV_STP) &&
+            opp->config & (eth_addr_equals(ctx->flow.dl_dst, eth_addr_stp)
+                           ? OFPPC_NO_RECV_STP : OFPPC_NO_RECV)) {
+            /* Drop this flow. */
+            return;
+        }
     }
 
     for (ia = actions_first(&iter, in, n_in); ia; ia = actions_next(&iter)) {
@@ -1015,6 +1119,7 @@ wx_xlate_actions(struct wx *wx, const union ofp_action *in, size_t n_in,
     }
 #endif
     if (xflow_actions_overflow(out)) {
+        COVERAGE_INC(xflow_overflow);
         xflow_actions_init(out);
         return ofp_mkerr(OFPET_BAD_ACTION, OFPBAC_TOO_MANY);
     }
@@ -1066,7 +1171,7 @@ uninstall_idle_flow(struct wx *wx, struct wx_rule *rule)
     }
 }
 
-static void
+static int
 expire_rule(struct cls_rule *cls_rule, void *wx_)
 {
     struct wx *wx = wx_;
@@ -1090,7 +1195,7 @@ expire_rule(struct cls_rule *cls_rule, void *wx_)
             //XXX active_timeout(wx, rule);
         }
 
-        return;
+        return 0;
     }
 
     COVERAGE_INC(wx_expired);
@@ -1099,7 +1204,7 @@ expire_rule(struct cls_rule *cls_rule, void *wx_)
      * due to an idle timeout. */
     if (rule->wr.cr.flow.wildcards) {
         struct wx_rule *subrule, *next;
-        LIST_FOR_EACH_SAFE (subrule, next, struct wx_rule, list, &rule->list) {
+        LIST_FOR_EACH_SAFE (subrule, next, list, &rule->list) {
             wx_rule_remove(wx, subrule);
         }
     } else {
@@ -1114,6 +1219,8 @@ expire_rule(struct cls_rule *cls_rule, void *wx_)
     }
 #endif
     wx_rule_remove(wx, rule);
+
+    return 0;
 }
 
 struct revalidate_cbdata {
@@ -1151,7 +1258,7 @@ revalidate_rule(struct wx *wx, struct wx_rule *rule)
     return true;
 }
 
-static void
+static int
 revalidate_cb(struct cls_rule *sub_, void *cbdata_)
 {
     struct wx_rule *sub = wx_rule_cast(sub_);
@@ -1162,6 +1269,7 @@ revalidate_cb(struct cls_rule *sub_, void *cbdata_)
         || tag_set_intersects(&cbdata->revalidate_set, sub->tags)) {
         revalidate_rule(cbdata->wx, sub);
     }
+    return 0;
 }
 
 static void
@@ -1195,7 +1303,7 @@ wx_run(void)
 {
     struct wx *wx;
 
-    LIST_FOR_EACH (wx, struct wx, list_node, &all_wx) {
+    LIST_FOR_EACH (wx, list_node, &all_wx) {
         wx_run_one(wx);
     }
     xf_run();
@@ -1216,7 +1324,7 @@ wx_wait(void)
 {
     struct wx *wx;
 
-    LIST_FOR_EACH (wx, struct wx, list_node, &all_wx) {
+    LIST_FOR_EACH (wx, list_node, &all_wx) {
         wx_wait_one(wx);
     }
     xf_wait();
@@ -1252,7 +1360,7 @@ wx_open(const struct wdp_class *wdp_class, const char *name, bool create,
         wx->xfif = xfif;
         classifier_init(&wx->cls);
         wx->netdev_monitor = netdev_monitor_create();
-        port_array_init(&wx->ports);
+        hmap_init(&wx->ports);
         shash_init(&wx->port_by_name);
         wx->next_expiration = time_msec() + 1000;
         tag_set_init(&wx->revalidate_set);
@@ -1263,6 +1371,8 @@ wx_open(const struct wdp_class *wdp_class, const char *name, bool create,
         wx->aux = wx;
         wx->ml = mac_learning_create();
 
+        list_init(&wx->ctl_packets);
+
         *wdpp = &wx->wdp;
     }
 
@@ -1280,6 +1390,8 @@ wx_close(struct wdp *wdp)
     netdev_monitor_destroy(wx->netdev_monitor);
     list_remove(&wx->list_node);
     mac_learning_destroy(wx->ml);
+    hmap_destroy(&wx->ports);
+    shash_destroy(&wx->port_by_name);
     free(wx);
 }
 
@@ -1305,8 +1417,7 @@ wx_get_features(const struct wdp *wdp, struct ofpbuf **featuresp)
     struct wx *wx = wx_cast(wdp);
     struct ofp_switch_features *osf;
     struct ofpbuf *buf;
-    unsigned int port_no;
-    struct wdp_port *port;
+    struct wx_port *port;
 
     buf = ofpbuf_new(sizeof *osf);
     osf = ofpbuf_put_zeros(buf, sizeof *osf);
@@ -1325,15 +1436,16 @@ wx_get_features(const struct wdp *wdp, struct ofpbuf **featuresp)
                          (1u << OFPAT_SET_TP_DST) |
                          (1u << OFPAT_ENQUEUE));
 
-    PORT_ARRAY_FOR_EACH (port, &wx->ports, port_no) {
-        hton_ofp_phy_port(ofpbuf_put(buf, &port->opp, sizeof port->opp));
+    HMAP_FOR_EACH (port, hmap_node, &wx->ports) {
+        const struct ofp_phy_port *opp = &port->wdp_port.opp;
+        hton_ofp_phy_port(ofpbuf_put(buf, opp, sizeof *opp));
     }
 
     *featuresp = buf;
     return 0;
 }
 
-static void
+static int
 count_subrules(struct cls_rule *cls_rule, void *n_subrules_)
 {
     struct wx_rule *rule = wx_rule_cast(cls_rule);
@@ -1342,6 +1454,7 @@ count_subrules(struct cls_rule *cls_rule, void *n_subrules_)
     if (rule->super) {
         (*n_subrules)++;
     }
+    return 0;
 }
 
 static int
@@ -1426,10 +1539,10 @@ wx_port_del(struct wdp *wdp, uint16_t port_no)
 }
 
 static int
-wx_answer_port_query(const struct wdp_port *port, struct wdp_port *portp)
+wx_answer_port_query(const struct wx_port *port, struct wdp_port *portp)
 {
     if (port) {
-        wdp_port_copy(portp, port);
+        wdp_port_copy(portp, &port->wdp_port);
         return 0;
     } else {
         return ENOENT;
@@ -1441,10 +1554,9 @@ wx_port_query_by_number(const struct wdp *wdp, uint16_t port_no,
                         struct wdp_port *portp)
 {
     struct wx *wx = wx_cast(wdp);
-    const struct wdp_port *port;
+    struct wx_port *wx_port = wx_port_get(wx, ofp_port_to_xflow_port(port_no));
 
-    port = port_array_get(&wx->ports, ofp_port_to_xflow_port(port_no));
-    return wx_answer_port_query(port, portp);
+    return wx_answer_port_query(wx_port, portp);
 }
 
 static int
@@ -1461,42 +1573,46 @@ static int
 wx_port_set_config(struct wdp *wdp, uint16_t port_no, uint32_t config)
 {
     struct wx *wx = wx_cast(wdp);
-    struct wdp_port *port;
+    struct wx_port *port;
+    struct ofp_phy_port *opp;
     uint32_t changes;
 
-    port = port_array_get(&wx->ports, ofp_port_to_xflow_port(port_no));
+    port = wx_port_get(wx, ofp_port_to_xflow_port(port_no));
     if (!port) {
         return ENOENT;
     }
-    changes = config ^ port->opp.config;
+    opp = &port->wdp_port.opp;
+    changes = config ^ opp->config;
 
     if (changes & OFPPC_PORT_DOWN) {
+        struct netdev *netdev = port->wdp_port.netdev;
         int error;
+
         if (config & OFPPC_PORT_DOWN) {
-            error = netdev_turn_flags_off(port->netdev, NETDEV_UP, true);
+            error = netdev_turn_flags_off(netdev, NETDEV_UP, true);
         } else {
-            error = netdev_turn_flags_on(port->netdev, NETDEV_UP, true);
+            error = netdev_turn_flags_on(netdev, NETDEV_UP, true);
         }
         if (!error) {
-            port->opp.config ^= OFPPC_PORT_DOWN;
+            opp->config ^= OFPPC_PORT_DOWN;
         }
     }
 
 #define REVALIDATE_BITS (OFPPC_NO_RECV | OFPPC_NO_RECV_STP | OFPPC_NO_FWD)
     if (changes & REVALIDATE_BITS) {
         COVERAGE_INC(wx_costly_flags);
-        port->opp.config ^= changes & REVALIDATE_BITS;
+        opp->config ^= changes & REVALIDATE_BITS;
         wx->need_revalidate = true;
     }
 #undef REVALIDATE_BITS
 
     if (changes & OFPPC_NO_FLOOD) {
-        port->opp.config ^= OFPPC_NO_FLOOD;
+        opp->config ^= OFPPC_NO_FLOOD;
         wx_port_refresh_groups(wx);
     }
 
     if (changes & OFPPC_NO_PACKET_IN) {
-        port->opp.config ^= OFPPC_NO_PACKET_IN;
+        opp->config ^= OFPPC_NO_PACKET_IN;
     }
 
     return 0;
@@ -1506,15 +1622,15 @@ static int
 wx_port_list(const struct wdp *wdp, struct wdp_port **portsp, size_t *n_portsp)
 {
     struct wx *wx = wx_cast(wdp);
-    struct wdp_port *ports, *port;
-    unsigned int port_no;
+    struct wdp_port *ports;
+    struct wx_port *port;
     size_t n_ports, i;
 
-    *n_portsp = n_ports = port_array_count(&wx->ports);
+    *n_portsp = n_ports = hmap_count(&wx->ports);
     *portsp = ports = xmalloc(n_ports * sizeof *ports);
     i = 0;
-    PORT_ARRAY_FOR_EACH (port, &wx->ports, port_no) {
-        wdp_port_copy(&ports[i++], port);
+    HMAP_FOR_EACH (port, hmap_node, &wx->ports) {
+        wdp_port_copy(&ports[i++], &port->wdp_port);
     }
     assert(i == n_ports);
 
@@ -1594,18 +1710,19 @@ struct wx_for_each_thunk_aux {
     void *client_aux;
 };
 
-static void
+static int
 wx_for_each_thunk(struct cls_rule *cls_rule, void *aux_)
 {
     struct wx_for_each_thunk_aux *aux = aux_;
     struct wx_rule *rule = wx_rule_cast(cls_rule);
 
     if (!wx_rule_is_hidden(rule)) {
-        aux->client_callback(&rule->wr, aux->client_aux);
+        return aux->client_callback(&rule->wr, aux->client_aux);
     }
+    return 0;
 }
 
-static void
+static int
 wx_flow_for_each_match(const struct wdp *wdp, const flow_t *target,
                        unsigned int include,
                        wdp_flow_cb_func *client_callback, void *client_aux)
@@ -1624,8 +1741,8 @@ wx_flow_for_each_match(const struct wdp *wdp, const flow_t *target,
 
     aux.client_callback = client_callback;
     aux.client_aux = client_aux;
-    classifier_for_each_match(&wx->cls, target, cls_include,
-                              wx_for_each_thunk, &aux);
+    return classifier_for_each_match(&wx->cls, target, cls_include,
+                                     wx_for_each_thunk, &aux);
 }
 
 /* Obtains statistic counters for 'rule' within 'wx' and stores them into
@@ -1659,7 +1776,7 @@ query_stats(struct wx *wx, struct wx_rule *rule, struct wdp_flow_stats *stats)
     xflow_flows = xzalloc(n_xflow_flows * sizeof *xflow_flows);
     if (rule->wr.cr.flow.wildcards) {
         size_t i = 0;
-        LIST_FOR_EACH (subrule, struct wx_rule, list, &rule->list) {
+        LIST_FOR_EACH (subrule, list, &rule->list) {
             xflow_key_from_flow(&xflow_flows[i++].key, &subrule->wr.cr.flow);
             stats->n_packets += subrule->packet_count;
             stats->n_bytes += subrule->byte_count;
@@ -1680,10 +1797,6 @@ query_stats(struct wx *wx, struct wx_rule *rule, struct wdp_flow_stats *stats)
             used = xflow_flow_stats_to_msec(&xflow_flow->stats);
             if (used > stats->used) {
                 stats->used = used;
-                if (xflow_flow->key.dl_type == htons(ETH_TYPE_IP)
-                    && xflow_flow->key.nw_proto == IP_TYPE_TCP) {
-                    stats->ip_tos = xflow_flow->stats.ip_tos;
-                }
             }
             stats->tcp_flags |= xflow_flow->stats.tcp_flags;
         }
@@ -1722,7 +1835,7 @@ wx_flow_put(struct wdp *wdp, const struct wdp_flow_put *put,
 
     ofp_table_id = put->flow->wildcards ? TABLEID_CLASSIFIER : TABLEID_HASH;
     if (put->ofp_table_id != 0xff && put->ofp_table_id != ofp_table_id) {
-        return EINVAL;
+        return ofp_mkerr_nicira(OFPET_FLOW_MOD_FAILED, NXFMFC_BAD_TABLE_ID);
     }
 
     rule = wx_rule_cast(classifier_find_rule_exactly(&wx->cls, put->flow));
@@ -1742,7 +1855,7 @@ wx_flow_put(struct wdp *wdp, const struct wdp_flow_put *put,
              ? classifier_count_wild(&wx->cls) >= WX_MAX_WILD
              : classifier_count_exact(&wx->cls) >= WX_MAX_EXACT)) {
             /* XXX subrules should not count against exact-match limit */
-            return ENOBUFS;
+            return ofp_mkerr(OFPET_FLOW_MOD_FAILED, OFPFMFC_ALL_TABLES_FULL);
         }
     }
 
@@ -1777,7 +1890,7 @@ wx_flow_delete(struct wdp *wdp, struct wdp_rule *wdp_rule,
     return 0;
 }
 
-static void
+static int
 wx_flush_rule(struct cls_rule *cls_rule, void *wx_)
 {
     struct wx_rule *rule = wx_rule_cast(cls_rule);
@@ -1790,6 +1903,8 @@ wx_flush_rule(struct cls_rule *cls_rule, void *wx_)
     rule->installed = false;
 
     wx_rule_remove(wx, rule);
+
+    return 0;
 }
 
 static int
@@ -1819,9 +1934,9 @@ wx_execute(struct wdp *wdp, uint16_t in_port,
     if (error) {
         return error;
     }
-    xfif_execute(wx->xfif, ofp_port_to_xflow_port(in_port),
-                 xflow_actions.actions, xflow_actions.n_actions, packet);
-    return 0;
+    return xfif_execute(wx->xfif, ofp_port_to_xflow_port(in_port),
+                        xflow_actions.actions, xflow_actions.n_actions,
+                        packet);
 }
 
 static int
@@ -1870,12 +1985,16 @@ wx_recv_set_mask(struct wdp *wdp, int listen_mask)
     struct wx *wx = wx_cast(wdp);
     int xflow_listen_mask;
 
+    wx->wdp_listen_mask = listen_mask;
+
     xflow_listen_mask = 0;
     if (listen_mask & (1 << WDP_CHAN_MISS)) {
         xflow_listen_mask |= XFLOWL_MISS;
     }
     if (listen_mask & (1 << WDP_CHAN_ACTION)) {
         xflow_listen_mask |= XFLOWL_ACTION;
+    } else {
+        wx_purge_ctl_packets__(wx);
     }
     if (listen_mask & (1 << WDP_CHAN_SFLOW)) {
         xflow_listen_mask |= XFLOWL_SFLOW;
@@ -1937,8 +2056,8 @@ wx_translate_xflow_msg(struct xflow_msg *msg, struct ofpbuf *payload,
 static const uint8_t *
 get_local_mac(const struct wx *wx)
 {
-    const struct wdp_port *port = port_array_get(&wx->ports, XFLOWP_LOCAL);
-    return port ? port->opp.hw_addr : NULL;
+    const struct wx_port *port = wx_port_get(wx, XFLOWP_LOCAL);
+    return port ? port->wdp_port.opp.hw_addr : NULL;
 }
 
 /* Returns true if 'packet' is a DHCP reply to the local port.  Such a reply
@@ -1964,13 +2083,18 @@ wx_is_local_dhcp_reply(const struct wx *wx,
     return false;
 }
 
+/* Determines whether 'payload' that arrived on 'in_port' is included in any of
+ * the flows in 'wx''s OpenFlow flow table.  If so, then it adds a
+ * corresponding flow to the xfif's exact-match flow table, taking ownership of
+ * 'payload', and returns true.  If not, it returns false and the caller
+ * retains ownership of 'payload'. */
 static bool
-wx_explode_rule(struct wx *wx, struct xflow_msg *msg, struct ofpbuf *payload)
+wx_explode_rule(struct wx *wx, uint16_t in_port, struct ofpbuf *payload)
 {
     struct wx_rule *rule;
     flow_t flow;
 
-    flow_extract(payload, 0, xflow_port_to_ofp_port(msg->port), &flow);
+    flow_extract(payload, 0, xflow_port_to_ofp_port(in_port), &flow);
 
     if (wx_is_local_dhcp_reply(wx, &flow, payload)) {
         union xflow_action action;
@@ -1978,7 +2102,7 @@ wx_explode_rule(struct wx *wx, struct xflow_msg *msg, struct ofpbuf *payload)
         memset(&action, 0, sizeof(action));
         action.output.type = XFLOWAT_OUTPUT;
         action.output.port = XFLOWP_LOCAL;
-        xfif_execute(wx->xfif, msg->port, &action, 1, payload);
+        xfif_execute(wx->xfif, in_port, &action, 1, payload);
     }
 
     rule = wx_rule_lookup_valid(wx, &flow);
@@ -2012,6 +2136,19 @@ wx_recv(struct wdp *wdp, struct wdp_packet *packet)
     struct wx *wx = wx_cast(wdp);
     int i;
 
+    if (wx->n_ctl_packets) {
+        struct wdp_packet *wdp_packet;
+
+        wdp_packet = CONTAINER_OF(list_pop_front(&wx->ctl_packets),
+                                  struct wdp_packet, list);
+        wx->n_ctl_packets--;
+
+        *packet = *wdp_packet;
+        free(wdp_packet);
+
+        return 0;
+    }
+
     /* XXX need to avoid 50*50 potential cost for caller. */
     for (i = 0; i < 50; i++) {
         struct xflow_msg *msg;
@@ -2024,10 +2161,10 @@ wx_recv(struct wdp *wdp, struct wdp_packet *packet)
         }
 
         msg = ofpbuf_pull(buf, sizeof *msg);
-        if (msg->type != _XFLOWL_MISS_NR || !wx_explode_rule(wx, msg, buf)) {
+        if (msg->type != _XFLOWL_MISS_NR
+            || !wx_explode_rule(wx, msg->port, buf)) {
             return wx_translate_xflow_msg(msg, buf, packet);
         }
-        ofpbuf_delete(buf);
     }
     return EAGAIN;
 }
@@ -2052,6 +2189,19 @@ wx_recv_purge_queue__(struct wx *wx, int max, int xflow_listen_mask,
     }
 }
 
+static void
+wx_purge_ctl_packets__(struct wx *wx)
+{
+    struct wdp_packet *this, *next;
+
+    LIST_FOR_EACH_SAFE (this, next, list, &wx->ctl_packets) {
+        list_remove(&this->list);
+        ofpbuf_delete(this->payload);
+        free(this);
+    }
+    wx->n_ctl_packets = 0;
+}
+
 static int
 wx_recv_purge(struct wdp *wdp)
 {
@@ -2074,6 +2224,7 @@ wx_recv_purge(struct wdp *wdp)
     if (xflow_listen_mask & XFLOWL_ACTION) {
         wx_recv_purge_queue__(wx, xflow_stats.max_action_queue, XFLOWL_ACTION,
                               &error);
+        wx_purge_ctl_packets__(wx);
     }
     if (xflow_listen_mask & XFLOWL_SFLOW) {
         wx_recv_purge_queue__(wx, xflow_stats.max_sflow_queue, XFLOWL_SFLOW,
@@ -2090,7 +2241,11 @@ wx_recv_wait(struct wdp *wdp)
 {
     struct wx *wx = wx_cast(wdp);
 
-    xfif_recv_wait(wx->xfif);
+    if (wx->n_ctl_packets) {
+        poll_immediate_wake();
+    } else {
+        xfif_recv_wait(wx->xfif);
+    }
 }
 
 static int
@@ -2145,16 +2300,16 @@ wx_port_refresh_group(struct wx *wx, unsigned int group)
 {
     uint16_t *ports;
     size_t n_ports;
-    struct wdp_port *port;
-    unsigned int port_no;
+    struct wx_port *port;
 
     assert(group == WX_GROUP_ALL || group == WX_GROUP_FLOOD);
 
-    ports = xmalloc(port_array_count(&wx->ports) * sizeof *ports);
+    ports = xmalloc(hmap_count(&wx->ports) * sizeof *ports);
     n_ports = 0;
-    PORT_ARRAY_FOR_EACH (port, &wx->ports, port_no) {
-        if (group == WX_GROUP_ALL || !(port->opp.config & OFPPC_NO_FLOOD)) {
-            ports[n_ports++] = port_no;
+    HMAP_FOR_EACH (port, hmap_node, &wx->ports) {
+        const struct ofp_phy_port *opp = &port->wdp_port.opp;
+        if (group == WX_GROUP_ALL || !(opp->config & OFPPC_NO_FLOOD)) {
+            ports[n_ports++] = port->xflow_port;
         }
     }
     xfif_port_group_set(wx->xfif, group, ports, n_ports);
@@ -2174,15 +2329,14 @@ static void
 wx_port_reinit(struct wx *wx, wdp_port_poll_cb_func *cb, void *aux)
 {
     struct svec devnames;
-    struct wdp_port *wdp_port;
-    unsigned int port_no;
+    struct wx_port *wx_port;
     struct xflow_port *xflow_ports;
     size_t n_xflow_ports;
     size_t i;
 
     svec_init(&devnames);
-    PORT_ARRAY_FOR_EACH (wdp_port, &wx->ports, port_no) {
-        svec_add (&devnames, (char *) wdp_port->opp.name);
+    HMAP_FOR_EACH (wx_port, hmap_node, &wx->ports) {
+        svec_add (&devnames, (char *) wx_port->wdp_port.opp.name);
     }
     xfif_port_list(wx->xfif, &xflow_ports, &n_xflow_ports);
     for (i = 0; i < n_xflow_ports; i++) {
@@ -2199,11 +2353,12 @@ wx_port_reinit(struct wx *wx, wdp_port_poll_cb_func *cb, void *aux)
     wx_port_refresh_groups(wx);
 }
 
-static struct wdp_port *
-make_wdp_port(const struct xflow_port *xflow_port)
+static struct wx_port *
+make_wx_port(const struct xflow_port *xflow_port)
 {
     struct netdev_options netdev_options;
     enum netdev_flags flags;
+    struct wx_port *wx_port;
     struct wdp_port *wdp_port;
     struct netdev *netdev;
     bool carrier;
@@ -2222,7 +2377,9 @@ make_wdp_port(const struct xflow_port *xflow_port)
         return NULL;
     }
 
-    wdp_port = xmalloc(sizeof *wdp_port);
+    wx_port = xmalloc(sizeof *wx_port);
+    wx_port->xflow_port = xflow_port->port;
+    wdp_port = &wx_port->wdp_port;
     wdp_port->netdev = netdev;
     wdp_port->opp.port_no = xflow_port_to_ofp_port(xflow_port->port);
     netdev_get_etheraddr(netdev, wdp_port->opp.hw_addr);
@@ -2242,13 +2399,13 @@ make_wdp_port(const struct xflow_port *xflow_port)
 
     wdp_port->devname = xstrdup(xflow_port->devname);
     wdp_port->internal = (xflow_port->flags & XFLOW_PORT_INTERNAL) != 0;
-    return wdp_port;
+    return wx_port;
 }
 
 static bool
 wx_port_conflicts(const struct wx *wx, const struct xflow_port *xflow_port)
 {
-    if (port_array_get(&wx->ports, xflow_port->port)) {
+    if (wx_port_get(wx, xflow_port->port)) {
         VLOG_WARN_RL(&rl, "ignoring duplicate port %"PRIu16" in datapath",
                      xflow_port->port);
         return true;
@@ -2262,10 +2419,10 @@ wx_port_conflicts(const struct wx *wx, const struct xflow_port *xflow_port)
 }
 
 static int
-wdp_port_equal(const struct wdp_port *a_, const struct wdp_port *b_)
+wx_port_equal(const struct wx_port *a_, const struct wx_port *b_)
 {
-    const struct ofp_phy_port *a = &a_->opp;
-    const struct ofp_phy_port *b = &b_->opp;
+    const struct ofp_phy_port *a = &a_->wdp_port.opp;
+    const struct ofp_phy_port *b = &b_->wdp_port.opp;
 
     BUILD_ASSERT_DECL(sizeof *a == 48); /* Detect ofp_phy_port changes. */
     return (a->port_no == b->port_no
@@ -2280,33 +2437,34 @@ wdp_port_equal(const struct wdp_port *a_, const struct wdp_port *b_)
 }
 
 static void
-wx_port_install(struct wx *wx, struct wdp_port *wdp_port)
+wx_port_install(struct wx *wx, struct wx_port *wx_port)
 {
-    uint16_t xflow_port = ofp_port_to_xflow_port(wdp_port->opp.port_no);
-    const char *netdev_name = (const char *) wdp_port->opp.name;
+    const struct ofp_phy_port *opp = &wx_port->wdp_port.opp;
+    uint16_t xflow_port = ofp_port_to_xflow_port(opp->port_no);
+    const char *name = (const char *) opp->name;
 
-    netdev_monitor_add(wx->netdev_monitor, wdp_port->netdev);
-    port_array_set(&wx->ports, xflow_port, wdp_port);
-    shash_add(&wx->port_by_name, netdev_name, wdp_port);
+    netdev_monitor_add(wx->netdev_monitor, wx_port->wdp_port.netdev);
+    hmap_insert(&wx->ports, &wx_port->hmap_node, hash_int(xflow_port, 0));
+    shash_add(&wx->port_by_name, name, wx_port);
 }
 
 static void
-wx_port_remove(struct wx *wx, struct wdp_port *wdp_port)
+wx_port_remove(struct wx *wx, struct wx_port *wx_port)
 {
-    uint16_t xflow_port = ofp_port_to_xflow_port(wdp_port->opp.port_no);
+    const struct ofp_phy_port *opp = &wx_port->wdp_port.opp;
+    const char *name = (const char *) opp->name;
 
-    netdev_monitor_remove(wx->netdev_monitor, wdp_port->netdev);
-    port_array_delete(&wx->ports, xflow_port);
-    shash_delete(&wx->port_by_name,
-                 shash_find(&wx->port_by_name, (char *) wdp_port->opp.name));
+    netdev_monitor_remove(wx->netdev_monitor, wx_port->wdp_port.netdev);
+    hmap_remove(&wx->ports, &wx_port->hmap_node);
+    shash_delete(&wx->port_by_name, shash_find(&wx->port_by_name, name));
 }
 
 static void
-wx_port_free(struct wdp_port *wdp_port)
+wx_port_free(struct wx_port *wx_port)
 {
-    if (wdp_port) {
-        netdev_close(wdp_port->netdev);
-        free(wdp_port);
+    if (wx_port) {
+        wdp_port_free(&wx_port->wdp_port);
+        free(wx_port);
     }
 }
 
@@ -2315,8 +2473,8 @@ wx_port_update(struct wx *wx, const char *devname,
                wdp_port_poll_cb_func *cb, void *aux)
 {
     struct xflow_port xflow_port;
-    struct wdp_port *old_wdp_port;
-    struct wdp_port *new_wdp_port;
+    struct wx_port *old_wx_port;
+    struct wx_port *new_wx_port;
     int error;
 
     COVERAGE_INC(wx_update_port);
@@ -2324,10 +2482,10 @@ wx_port_update(struct wx *wx, const char *devname,
     /* Query the datapath for port information. */
     error = xfif_port_query_by_name(wx->xfif, devname, &xflow_port);
 
-    /* Find the old wdp_port. */
-    old_wdp_port = shash_find_data(&wx->port_by_name, devname);
+    /* Find the old wx_port. */
+    old_wx_port = shash_find_data(&wx->port_by_name, devname);
     if (!error) {
-        if (!old_wdp_port) {
+        if (!old_wx_port) {
             /* There's no port named 'devname' but there might be a port with
              * the same port number.  This could happen if a port is deleted
              * and then a new one added in its place very quickly, or if a port
@@ -2338,7 +2496,7 @@ wx_port_update(struct wx *wx, const char *devname,
              * reliably but more portably by comparing the old port's MAC
              * against the new port's MAC.  However, this code isn't that smart
              * and always sends an OFPPR_MODIFY (XXX). */
-            old_wdp_port = port_array_get(&wx->ports, xflow_port.port);
+            old_wx_port = wx_port_get(wx, xflow_port.port);
         }
     } else if (error != ENOENT && error != ENODEV) {
         VLOG_WARN_RL(&rl, "xfif_port_query_by_name returned unexpected error "
@@ -2346,48 +2504,50 @@ wx_port_update(struct wx *wx, const char *devname,
         return;
     }
 
-    /* Create a new wdp_port. */
-    new_wdp_port = !error ? make_wdp_port(&xflow_port) : NULL;
+    /* Create a new wx_port. */
+    new_wx_port = !error ? make_wx_port(&xflow_port) : NULL;
 
     /* Eliminate a few pathological cases. */
-    if (!old_wdp_port && !new_wdp_port) {
+    if (!old_wx_port && !new_wx_port) {
         return;
-    } else if (old_wdp_port && new_wdp_port) {
+    } else if (old_wx_port && new_wx_port) {
         /* Most of the 'config' bits are OpenFlow soft state, but
          * OFPPC_PORT_DOWN is maintained by the kernel.  So transfer the
-         * OpenFlow bits from old_wdp_port.  (make_wdp_port() only sets
+         * OpenFlow bits from old_wx_port.  (make_wx_port() only sets
          * OFPPC_PORT_DOWN and leaves the other bits 0.)  */
-        new_wdp_port->opp.config |= old_wdp_port->opp.config & ~OFPPC_PORT_DOWN;
+        struct ofp_phy_port *new_opp = &new_wx_port->wdp_port.opp;
+        struct ofp_phy_port *old_opp = &old_wx_port->wdp_port.opp;
+        new_opp->config |= old_opp->config & ~OFPPC_PORT_DOWN;
 
-        if (wdp_port_equal(old_wdp_port, new_wdp_port)) {
+        if (wx_port_equal(old_wx_port, new_wx_port)) {
             /* False alarm--no change. */
-            wx_port_free(new_wdp_port);
+            wx_port_free(new_wx_port);
             return;
         }
     }
 
     /* Now deal with the normal cases. */
-    if (old_wdp_port) {
-        wx_port_remove(wx, old_wdp_port);
+    if (old_wx_port) {
+        wx_port_remove(wx, old_wx_port);
     }
-    if (new_wdp_port) {
-        wx_port_install(wx, new_wdp_port);
+    if (new_wx_port) {
+        wx_port_install(wx, new_wx_port);
     }
 
     /* Call back. */
-    if (!old_wdp_port) {
-        (*cb)(&new_wdp_port->opp, OFPPR_ADD, aux);
-    } else if (!new_wdp_port) {
-        (*cb)(&old_wdp_port->opp, OFPPR_DELETE, aux);
+    if (!old_wx_port) {
+        (*cb)(&new_wx_port->wdp_port.opp, OFPPR_ADD, aux);
+    } else if (!new_wx_port) {
+        (*cb)(&old_wx_port->wdp_port.opp, OFPPR_DELETE, aux);
     } else {
-        (*cb)(&new_wdp_port->opp, OFPPR_MODIFY, aux);
+        (*cb)(&new_wx_port->wdp_port.opp, OFPPR_MODIFY, aux);
     }
 
     /* Update port groups. */
     wx_port_refresh_groups(wx);
 
     /* Clean up. */
-    wx_port_free(old_wdp_port);
+    wx_port_free(old_wx_port);
 }
 
 static int
@@ -2406,9 +2566,9 @@ wx_port_init(struct wx *wx)
     for (i = 0; i < n_ports; i++) {
         const struct xflow_port *xflow_port = &ports[i];
         if (!wx_port_conflicts(wx, xflow_port)) {
-            struct wdp_port *wdp_port = make_wdp_port(xflow_port);
-            if (wdp_port) {
-                wx_port_install(wx, wdp_port);
+            struct wx_port *wx_port = make_wx_port(xflow_port);
+            if (wx_port) {
+                wx_port_install(wx, wx_port);
             }
         }
     }
@@ -2416,6 +2576,21 @@ wx_port_init(struct wx *wx)
     wx_port_refresh_groups(wx);
     return 0;
 }
+
+/* Returns the port in 'wx' with xflow port number 'xflow_port'. */
+static struct wx_port *
+wx_port_get(const struct wx *wx, uint16_t xflow_port)
+{
+    struct wx_port *port;
+
+    HMAP_FOR_EACH_IN_BUCKET (port, hmap_node, hash_int(xflow_port, 0),
+                             &wx->ports) {
+        if (port->xflow_port == xflow_port) {
+            return port;
+        }
+    }
+    return NULL;
+}
 \f
 void
 wdp_xflow_register(void)