Merge "master" into "wdp".
[sliver-openvswitch.git] / ofproto / wdp-xflow.c
index 66b245a..26830e6 100644 (file)
 
 #include "coverage.h"
 #include "dhcp.h"
+#include "mac-learning.h"
 #include "netdev.h"
 #include "netflow.h"
 #include "ofp-util.h"
 #include "ofpbuf.h"
+#include "ofproto.h"
 #include "openflow/nicira-ext.h"
 #include "openflow/openflow.h"
 #include "packets.h"
 #include "poll-loop.h"
 #include "port-array.h"
+#include "queue.h"
 #include "shash.h"
-#include "stp.h"
 #include "svec.h"
 #include "timeval.h"
 #include "util.h"
 #include "wdp-provider.h"
 #include "xfif.h"
 #include "xflow-util.h"
+#include "vlog.h"
 #include "xtoxll.h"
 
-#include <linux/types.h>        /* XXX */
-#include <linux/pkt_sched.h>    /* XXX */
+VLOG_DEFINE_THIS_MODULE(wdp_xflow)
 
-#define THIS_MODULE VLM_wdp_xflow
-#include "vlog.h"
+enum {
+    TABLEID_HASH = 0,
+    TABLEID_CLASSIFIER = 1
+};
 
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 \f
@@ -55,25 +59,54 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 #define WX_MAX_WILD     65536   /* Wildcarded rules. */
 #define WX_MAX_EXACT    1048576 /* Exact-match rules. */
 
+struct wx_port {
+    struct hmap_node hmap_node;
+    struct wdp_port wdp_port;
+    uint16_t xflow_port;
+};
+
 struct wx {
     struct list list_node;
     struct wdp wdp;
     struct xfif *xfif;
     struct classifier cls;
     struct netdev_monitor *netdev_monitor;
-    struct port_array ports;    /* Index is xflow port nr;
-                                 * wdp_port->opp.port_no is OFP port nr. */
+    struct hmap ports;          /* Contains "struct wx_port"s. */
     struct shash port_by_name;
-    bool need_revalidate;
     long long int next_expiration;
+    int wdp_listen_mask;
+
+    /* Rules that might need to be revalidated. */
+    bool need_revalidate;      /* Revalidate all subrules? */
+    bool revalidate_all;       /* Revalidate all subrules and other rules? */
+    struct tag_set revalidate_set; /* Tag set of (sub)rules to revalidate. */
+
+    /* Hooks for ovs-vswitchd. */
+    const struct ofhooks *ofhooks;
+    void *aux;
+
+    /* Used by default ofhooks. */
+    struct mac_learning *ml;
+
+    /* List of "struct wdp_packets" queued for the controller by
+     * execute_xflow_actions(). */
+#define MAX_CTL_PACKETS 50
+    struct list ctl_packets;
+    int n_ctl_packets;
 };
 
+static const struct ofhooks default_ofhooks;
+
 static struct list all_wx = LIST_INITIALIZER(&all_wx);
 
 static int wx_port_init(struct wx *);
-static void wx_port_run(struct wx *);
+static struct wx_port *wx_port_get(const struct wx *, uint16_t xflow_port);
+static void wx_port_process_change(struct wx *wx, int error, char *devname,
+                                   wdp_port_poll_cb_func *cb, void *aux);
 static void wx_port_refresh_groups(struct wx *);
 
+static void wx_purge_ctl_packets__(struct wx *);
+
 enum {
     WX_GROUP_FLOOD = 0,
     WX_GROUP_ALL = 1
@@ -88,7 +121,8 @@ wx_cast(const struct wdp *wdp)
 static int
 wx_xlate_actions(struct wx *, const union ofp_action *, size_t n,
                  const flow_t *flow, const struct ofpbuf *packet,
-                 struct xflow_actions *out, bool *may_set_up_flow);
+                 tag_type *tags, struct xflow_actions *out,
+                 bool *may_set_up_flow);
 \f
 struct wx_rule {
     struct wdp_rule wr;
@@ -97,6 +131,7 @@ struct wx_rule {
     uint64_t byte_count;        /* Number of bytes received. */
     uint64_t accounted_bytes;   /* Number of bytes passed to account_cb. */
     long long int used;         /* Last-used time (0 if never used). */
+    tag_type tags;              /* Tags (set only by hooks). */
 
     /* If 'super' is non-NULL, this rule is a subrule, that is, it is an
      * exact-match rule (having cr.wc.wildcards of 0) generated from the
@@ -211,8 +246,7 @@ wx_rule_update_stats(struct wx *wx, struct wx_rule *rule,
         wx_rule_update_time(wx, rule, stats);
         rule->packet_count += stats->n_packets;
         rule->byte_count += stats->n_bytes;
-        /* XXX netflow_flow_update_flags(&rule->nf_flow, stats->ip_tos,
-           stats->tcp_flags); */
+        /* XXX netflow_flow_update_flags(&rule->nf_flow, stats->tcp_flags); */
     }
 }
 
@@ -306,7 +340,7 @@ wx_rule_destroy(struct wx *wx, struct wx_rule *rule)
 {
     if (!rule->super) {
         struct wx_rule *subrule, *next;
-        LIST_FOR_EACH_SAFE (subrule, next, struct wx_rule, list, &rule->list) {
+        LIST_FOR_EACH_SAFE (subrule, next, list, &rule->list) {
             wx_rule_revalidate(wx, subrule);
         }
     } else {
@@ -337,8 +371,8 @@ wx_rule_has_out_port(const struct wx_rule *rule, uint16_t out_port)
 }
 #endif
 
-/* Caller is responsible for initializing the 'cr' member of the returned
- * rule. */
+/* Caller is responsible for initializing the 'cr' and ofp_table_id members of
+ * the returned rule. */
 static struct wx_rule *
 wx_rule_create(struct wx_rule *super,
                const union ofp_action *actions, size_t n_actions,
@@ -363,8 +397,48 @@ wx_rule_create(struct wx_rule *super,
     return rule;
 }
 
+/* Executes, within 'wx', the 'n_actions' actions in 'actions' on 'packet',
+ * which arrived on 'in_port'.
+ *
+ * Takes ownership of 'packet'. */
+static bool
+execute_xflow_actions(struct wx *wx, uint16_t in_port,
+                      const union xflow_action *actions, size_t n_actions,
+                      struct ofpbuf *packet)
+{
+    if (n_actions == 1 && actions[0].type == XFLOWAT_CONTROLLER
+        && wx->n_ctl_packets < MAX_CTL_PACKETS) {
+        /* As an optimization, avoid a round-trip from userspace to kernel to
+         * userspace.  This also avoids possibly filling up kernel packet
+         * buffers along the way. */
+        struct wdp_packet *wdp_packet;
+
+        if (!(wx->wdp_listen_mask & WDP_CHAN_ACTION)) {
+            return true;
+        }
+
+        wdp_packet = xmalloc(sizeof *wdp_packet);
+        wdp_packet->channel = WDP_CHAN_ACTION;
+        wdp_packet->tun_id = 0;
+        wdp_packet->in_port = in_port;
+        wdp_packet->send_len = actions[0].controller.arg;
+        wdp_packet->payload = packet;
+
+        list_push_back(&wx->ctl_packets, &wdp_packet->list);
+
+        return true;
+    } else {
+        int error;
+
+        error = xfif_execute(wx->xfif, in_port, actions, n_actions, packet);
+        ofpbuf_delete(packet);
+        return !error;
+    }
+}
+
 /* Executes the actions indicated by 'rule' on 'packet', which is in flow
- * 'flow' and is considered to have arrived on xflow port 'in_port'.
+ * 'flow' and is considered to have arrived on xflow port 'in_port'.  'packet'
+ * must have at least sizeof(struct ofp_packet_in) bytes of headroom.
  *
  * The flow that 'packet' actually contains does not need to actually match
  * 'rule'; the actions in 'rule' will be applied to it either way.  Likewise,
@@ -376,15 +450,20 @@ wx_rule_create(struct wx_rule *super,
  * 'packet' using rule_make_actions().  If 'rule' is a wildcard rule, or if
  * 'rule' is an exact-match rule but 'flow' is not the rule's flow, then this
  * function will compose a set of xflow actions based on 'rule''s OpenFlow
- * actions and apply them to 'packet'. */
+ * actions and apply them to 'packet'.
+ *
+ * Takes ownership of 'packet'. */
 static void
 wx_rule_execute(struct wx *wx, struct wx_rule *rule,
                 struct ofpbuf *packet, const flow_t *flow)
 {
     const union xflow_action *actions;
+    struct xflow_flow_stats stats;
     size_t n_actions;
     struct xflow_actions a;
 
+    assert(ofpbuf_headroom(packet) >= sizeof(struct ofp_packet_in));
+
     /* Grab or compose the xflow actions.
      *
      * The special case for an exact-match 'rule' where 'flow' is not the
@@ -392,11 +471,12 @@ wx_rule_execute(struct wx *wx, struct wx_rule *rule,
      * port simply because the xflow actions were composed for the wrong
      * scenario. */
     if (rule->wr.cr.flow.wildcards
-        || !flow_equal(flow, &rule->wr.cr.flow))
+        || !flow_equal_headers(flow, &rule->wr.cr.flow))
     {
         struct wx_rule *super = rule->super ? rule->super : rule;
         if (wx_xlate_actions(wx, super->wr.actions, super->wr.n_actions, flow,
-                             packet, &a, NULL)) {
+                             packet, NULL, &a, NULL)) {
+            ofpbuf_delete(packet);
             return;
         }
         actions = a.actions;
@@ -407,16 +487,21 @@ wx_rule_execute(struct wx *wx, struct wx_rule *rule,
     }
 
     /* Execute the xflow actions. */
-    if (!xfif_execute(wx->xfif, flow->in_port,
-                      actions, n_actions, packet)) {
-        struct xflow_flow_stats stats;
-        flow_extract_stats(flow, packet, &stats);
+    flow_extract_stats(flow, packet, &stats);
+    if (!execute_xflow_actions(wx, flow->in_port,
+                               actions, n_actions, packet)) {
         wx_rule_update_stats(wx, rule, &stats);
         rule->used = time_msec();
         //XXX netflow_flow_update_time(wx->netflow, &rule->nf_flow, rule->used);
     }
 }
 
+/* Inserts 'rule' into 'p''s flow table.
+ *
+ * If 'packet' is nonnull, takes ownership of 'packet', executes 'rule''s
+ * actions on it and credits the statistics for sending the packet to 'rule'.
+ * 'packet' must have at least sizeof(struct ofp_packet_in) bytes of
+ * headroom. */
 static void
 wx_rule_insert(struct wx *wx, struct wx_rule *rule, struct ofpbuf *packet,
                uint16_t in_port)
@@ -460,6 +545,8 @@ wx_rule_create_subrule(struct wx *wx, struct wx_rule *rule, const flow_t *flow)
     subrule = wx_rule_create(rule, NULL, 0,
                              rule->wr.idle_timeout,
                              rule->wr.hard_timeout);
+    /* Subrules aren't really in any OpenFlow table, so don't bother with
+     * subrule->wr.ofp_table_id. */
     COVERAGE_INC(wx_subrule_create);
     cls_rule_from_flow(flow, &subrule->wr.cr);
     classifier_insert_exact(&wx->cls, &subrule->wr.cr);
@@ -480,7 +567,8 @@ wx_rule_make_actions(struct wx *wx, struct wx_rule *rule,
 
     super = rule->super ? rule->super : rule;
     wx_xlate_actions(wx, super->wr.actions, super->wr.n_actions,
-                     &rule->wr.cr.flow, packet, &a, &rule->may_install);
+                     &rule->wr.cr.flow, packet,
+                     &rule->tags, &a, &rule->may_install);
 
     actions_len = a.n_actions * sizeof *a.actions;
     if (rule->n_xflow_actions != a.n_actions
@@ -589,11 +677,10 @@ add_output_group_action(struct xflow_actions *actions, uint16_t group,
 }
 
 static void
-add_controller_action(struct xflow_actions *actions,
-                      const struct ofp_action_output *oao)
+add_controller_action(struct xflow_actions *actions, uint16_t max_len)
 {
     union xflow_action *a = xflow_actions_add(actions, XFLOWAT_CONTROLLER);
-    a->controller.arg = ntohs(oao->max_len);
+    a->controller.arg = max_len;
 }
 
 struct wx_xlate_ctx {
@@ -607,7 +694,7 @@ struct wx_xlate_ctx {
 
     /* Output. */
     struct xflow_actions *out;    /* Datapath actions. */
-    //tag_type *tags;             /* Tags associated with OFPP_NORMAL actions. */
+    tag_type *tags;             /* Tags associated with OFPP_NORMAL actions. */
     bool may_set_up_flow;       /* True ordinarily; false if the actions must
                                  * be reassessed for every packet. */
     uint16_t nf_output_iface;   /* Output interface index for NetFlow. */
@@ -619,10 +706,10 @@ static void do_xlate_actions(const union ofp_action *in, size_t n_in,
 static void
 add_output_action(struct wx_xlate_ctx *ctx, uint16_t port)
 {
-    const struct wdp_port *wdp_port = port_array_get(&ctx->wx->ports, port);
+    const struct wx_port *wx_port = wx_port_get(ctx->wx, port);
 
-    if (wdp_port) {
-        if (wdp_port->opp.config & OFPPC_NO_FWD) {
+    if (wx_port) {
+        if (wx_port->wdp_port.opp.config & OFPPC_NO_FWD) {
             /* Forwarding disabled on port. */
             return;
         }
@@ -684,15 +771,15 @@ xlate_table_action(struct wx_xlate_ctx *ctx, uint16_t in_port)
 }
 
 static void
-xlate_output_action(struct wx_xlate_ctx *ctx,
-                    const struct ofp_action_output *oao)
+xlate_output_action__(struct wx_xlate_ctx *ctx,
+                      uint16_t port, uint16_t max_len)
 {
     uint16_t xflow_port;
     uint16_t prev_nf_output_iface = ctx->nf_output_iface;
 
     ctx->nf_output_iface = NF_OUT_DROP;
 
-    switch (ntohs(oao->port)) {
+    switch (port) {
     case OFPP_IN_PORT:
         add_output_action(ctx, ctx->flow.in_port);
         break;
@@ -700,8 +787,7 @@ xlate_output_action(struct wx_xlate_ctx *ctx,
         xlate_table_action(ctx, ctx->flow.in_port);
         break;
     case OFPP_NORMAL:
-#if 0
-        if (!ctx->wx->ofhooks->normal_cb(ctx->flow, ctx->packet,
+        if (!ctx->wx->ofhooks->normal_cb(&ctx->flow, ctx->packet,
                                          ctx->out, ctx->tags,
                                          &ctx->nf_output_iface,
                                          ctx->wx->aux)) {
@@ -709,9 +795,7 @@ xlate_output_action(struct wx_xlate_ctx *ctx,
             ctx->may_set_up_flow = false;
         }
         break;
-#else
-        /* fall through to flood for now */
-#endif
+
     case OFPP_FLOOD:
         add_output_group_action(ctx->out, WX_GROUP_FLOOD,
                                 &ctx->nf_output_iface);
@@ -720,13 +804,13 @@ xlate_output_action(struct wx_xlate_ctx *ctx,
         add_output_group_action(ctx->out, WX_GROUP_ALL, &ctx->nf_output_iface);
         break;
     case OFPP_CONTROLLER:
-        add_controller_action(ctx->out, oao);
+        add_controller_action(ctx->out, max_len);
         break;
     case OFPP_LOCAL:
         add_output_action(ctx, XFLOWP_LOCAL);
         break;
     default:
-        xflow_port = ofp_port_to_xflow_port(ntohs(oao->port));
+        xflow_port = ofp_port_to_xflow_port(port);
         if (xflow_port != ctx->flow.in_port) {
             add_output_action(ctx, xflow_port);
         }
@@ -743,6 +827,13 @@ xlate_output_action(struct wx_xlate_ctx *ctx,
     }
 }
 
+static void
+xlate_output_action(struct wx_xlate_ctx *ctx,
+                    const struct ofp_action_output *oao)
+{
+    xlate_output_action__(ctx, ntohs(oao->port), ntohs(oao->max_len));
+}
+
 /* If the final xflow action in 'ctx' is "pop priority", drop it, as an
  * optimization, because we're going to add another action that sets the
  * priority immediately after, or because there are no actions following the
@@ -761,6 +852,16 @@ xlate_enqueue_action(struct wx_xlate_ctx *ctx,
                      const struct ofp_action_enqueue *oae)
 {
     uint16_t ofp_port, xflow_port;
+    uint32_t priority;
+    int error;
+
+    error = xfif_queue_to_priority(ctx->wx->xfif, ntohl(oae->queue_id),
+                                   &priority);
+    if (error) {
+        /* Fall back to ordinary output action. */
+        xlate_output_action__(ctx, ntohs(oae->port), 0);
+        return;
+    }
 
     /* Figure out xflow output port. */
     ofp_port = ntohs(oae->port);
@@ -773,7 +874,7 @@ xlate_enqueue_action(struct wx_xlate_ctx *ctx,
     /* Add xflow actions. */
     remove_pop_action(ctx);
     xflow_actions_add(ctx->out, XFLOWAT_SET_PRIORITY)->priority.priority
-        = TC_H_MAKE(1, ntohl(oae->queue_id)); /* XXX */
+        = priority;
     add_output_action(ctx, xflow_port);
     xflow_actions_add(ctx->out, XFLOWAT_POP_PRIORITY);
 
@@ -785,12 +886,33 @@ xlate_enqueue_action(struct wx_xlate_ctx *ctx,
     }
 }
 
+static void
+xlate_set_queue_action(struct wx_xlate_ctx *ctx,
+                       const struct nx_action_set_queue *nasq)
+{
+    uint32_t priority;
+    int error;
+
+    error = xfif_queue_to_priority(ctx->wx->xfif, ntohl(nasq->queue_id),
+                                   &priority);
+    if (error) {
+        /* Couldn't translate queue to a priority, so ignore.  A warning
+         * has already been logged. */
+        return;
+    }
+
+    remove_pop_action(ctx);
+    xflow_actions_add(ctx->out, XFLOWAT_SET_PRIORITY)->priority.priority
+        = priority;
+}
+
 static void
 xlate_nicira_action(struct wx_xlate_ctx *ctx,
                     const struct nx_action_header *nah)
 {
     const struct nx_action_resubmit *nar;
     const struct nx_action_set_tunnel *nast;
+    const struct nx_action_set_queue *nasq;
     union xflow_action *oa;
     int subtype = ntohs(nah->subtype);
 
@@ -807,6 +929,21 @@ xlate_nicira_action(struct wx_xlate_ctx *ctx,
         ctx->flow.tun_id = oa->tunnel.tun_id = nast->tun_id;
         break;
 
+    case NXAST_DROP_SPOOFED_ARP:
+        if (ctx->flow.dl_type == htons(ETH_TYPE_ARP)) {
+            xflow_actions_add(ctx->out, XFLOWAT_DROP_SPOOFED_ARP);
+        }
+        break;
+
+    case NXAST_SET_QUEUE:
+        nasq = (const struct nx_action_set_queue *) nah;
+        xlate_set_queue_action(ctx, nasq);
+        break;
+
+    case NXAST_POP_QUEUE:
+        xflow_actions_add(ctx->out, XFLOWAT_POP_PRIORITY);
+        break;
+
     /* If you add a new action here that modifies flow data, don't forget to
      * update the flow key in ctx->flow at the same time. */
 
@@ -822,14 +959,17 @@ do_xlate_actions(const union ofp_action *in, size_t n_in,
 {
     struct actions_iterator iter;
     const union ofp_action *ia;
-    const struct wdp_port *port;
+    const struct wx_port *port;
 
-    port = port_array_get(&ctx->wx->ports, ctx->flow.in_port);
-    if (port && port->opp.config & (OFPPC_NO_RECV | OFPPC_NO_RECV_STP) &&
-        port->opp.config & (eth_addr_equals(ctx->flow.dl_dst, stp_eth_addr)
-                            ? OFPPC_NO_RECV_STP : OFPPC_NO_RECV)) {
-        /* Drop this flow. */
-        return;
+    port = wx_port_get(ctx->wx, ctx->flow.in_port);
+    if (port) {
+        const struct ofp_phy_port *opp = &port->wdp_port.opp;
+        if (opp->config & (OFPPC_NO_RECV | OFPPC_NO_RECV_STP) &&
+            opp->config & (eth_addr_equals(ctx->flow.dl_dst, eth_addr_stp)
+                           ? OFPPC_NO_RECV_STP : OFPPC_NO_RECV)) {
+            /* Drop this flow. */
+            return;
+        }
     }
 
     for (ia = actions_first(&iter, in, n_in); ia; ia = actions_next(&iter)) {
@@ -952,9 +1092,10 @@ wx_may_set_up(const flow_t *flow, const struct xflow_actions *actions)
 static int
 wx_xlate_actions(struct wx *wx, const union ofp_action *in, size_t n_in,
                  const flow_t *flow, const struct ofpbuf *packet,
-                 struct xflow_actions *out, bool *may_set_up_flow)
+                 tag_type *tags, struct xflow_actions *out,
+                 bool *may_set_up_flow)
 {
-    //tag_type no_tags = 0;
+    tag_type no_tags = 0;
     struct wx_xlate_ctx ctx;
     COVERAGE_INC(wx_ofp2xflow);
     xflow_actions_init(out);
@@ -963,7 +1104,7 @@ wx_xlate_actions(struct wx *wx, const union ofp_action *in, size_t n_in,
     ctx.wx = wx;
     ctx.packet = packet;
     ctx.out = out;
-    //ctx.tags = tags ? tags : &no_tags;
+    ctx.tags = tags ? tags : &no_tags;
     ctx.may_set_up_flow = true;
     ctx.nf_output_iface = NF_OUT_DROP;
     do_xlate_actions(in, n_in, &ctx);
@@ -978,6 +1119,7 @@ wx_xlate_actions(struct wx *wx, const union ofp_action *in, size_t n_in,
     }
 #endif
     if (xflow_actions_overflow(out)) {
+        COVERAGE_INC(xflow_overflow);
         xflow_actions_init(out);
         return ofp_mkerr(OFPET_BAD_ACTION, OFPBAC_TOO_MANY);
     }
@@ -1029,7 +1171,7 @@ uninstall_idle_flow(struct wx *wx, struct wx_rule *rule)
     }
 }
 
-static void
+static int
 expire_rule(struct cls_rule *cls_rule, void *wx_)
 {
     struct wx *wx = wx_;
@@ -1053,7 +1195,7 @@ expire_rule(struct cls_rule *cls_rule, void *wx_)
             //XXX active_timeout(wx, rule);
         }
 
-        return;
+        return 0;
     }
 
     COVERAGE_INC(wx_expired);
@@ -1062,7 +1204,7 @@ expire_rule(struct cls_rule *cls_rule, void *wx_)
      * due to an idle timeout. */
     if (rule->wr.cr.flow.wildcards) {
         struct wx_rule *subrule, *next;
-        LIST_FOR_EACH_SAFE (subrule, next, struct wx_rule, list, &rule->list) {
+        LIST_FOR_EACH_SAFE (subrule, next, list, &rule->list) {
             wx_rule_remove(wx, subrule);
         }
     } else {
@@ -1077,13 +1219,15 @@ expire_rule(struct cls_rule *cls_rule, void *wx_)
     }
 #endif
     wx_rule_remove(wx, rule);
+
+    return 0;
 }
 
 struct revalidate_cbdata {
     struct wx *wx;
     bool revalidate_all;        /* Revalidate all exact-match rules? */
     bool revalidate_subrules;   /* Revalidate all exact-match subrules? */
-    //struct tag_set revalidate_set; /* Set of tags to revalidate. */
+    struct tag_set revalidate_set; /* Set of tags to revalidate. */
 };
 
 static bool
@@ -1114,7 +1258,7 @@ revalidate_rule(struct wx *wx, struct wx_rule *rule)
     return true;
 }
 
-static void
+static int
 revalidate_cb(struct cls_rule *sub_, void *cbdata_)
 {
     struct wx_rule *sub = wx_rule_cast(sub_);
@@ -1122,16 +1266,15 @@ revalidate_cb(struct cls_rule *sub_, void *cbdata_)
 
     if (cbdata->revalidate_all
         || (cbdata->revalidate_subrules && sub->super)
-        /*|| (tag_set_intersects(&cbdata->revalidate_set, sub->tags))*/) {
+        || tag_set_intersects(&cbdata->revalidate_set, sub->tags)) {
         revalidate_rule(cbdata->wx, sub);
     }
+    return 0;
 }
 
 static void
 wx_run_one(struct wx *wx)
 {
-    wx_port_run(wx);
-
     if (time_msec() >= wx->next_expiration) {
         COVERAGE_INC(wx_expiration);
         wx->next_expiration = time_msec() + 1000;
@@ -1142,13 +1285,13 @@ wx_run_one(struct wx *wx)
         /* XXX account_checkpoint_cb */
     }
 
-    if (wx->need_revalidate /*|| !tag_set_is_empty(&p->revalidate_set)*/) {
+    if (wx->need_revalidate || !tag_set_is_empty(&wx->revalidate_set)) {
         struct revalidate_cbdata cbdata;
         cbdata.wx = wx;
-        cbdata.revalidate_all = false;
+        cbdata.revalidate_all = wx->revalidate_all;
         cbdata.revalidate_subrules = wx->need_revalidate;
-        //cbdata.revalidate_set = wx->revalidate_set;
-        //tag_set_init(&wx->revalidate_set);
+        cbdata.revalidate_set = wx->revalidate_set;
+        tag_set_init(&wx->revalidate_set);
         COVERAGE_INC(wx_revalidate);
         classifier_for_each(&wx->cls, CLS_INC_EXACT, revalidate_cb, &cbdata);
         wx->need_revalidate = false;
@@ -1160,7 +1303,7 @@ wx_run(void)
 {
     struct wx *wx;
 
-    LIST_FOR_EACH (wx, struct wx, list_node, &all_wx) {
+    LIST_FOR_EACH (wx, list_node, &all_wx) {
         wx_run_one(wx);
     }
     xf_run();
@@ -1169,9 +1312,7 @@ wx_run(void)
 static void
 wx_wait_one(struct wx *wx)
 {
-    xfif_port_poll_wait(wx->xfif);
-    netdev_monitor_poll_wait(wx->netdev_monitor);
-    if (wx->need_revalidate /*|| !tag_set_is_empty(&p->revalidate_set)*/) {
+    if (wx->need_revalidate || !tag_set_is_empty(&wx->revalidate_set)) {
         poll_immediate_wake();
     } else if (wx->next_expiration != LLONG_MAX) {
         poll_timer_wait_until(wx->next_expiration);
@@ -1183,7 +1324,7 @@ wx_wait(void)
 {
     struct wx *wx;
 
-    LIST_FOR_EACH (wx, struct wx, list_node, &all_wx) {
+    LIST_FOR_EACH (wx, list_node, &all_wx) {
         wx_wait_one(wx);
     }
     xf_wait();
@@ -1219,12 +1360,19 @@ wx_open(const struct wdp_class *wdp_class, const char *name, bool create,
         wx->xfif = xfif;
         classifier_init(&wx->cls);
         wx->netdev_monitor = netdev_monitor_create();
-        port_array_init(&wx->ports);
+        hmap_init(&wx->ports);
         shash_init(&wx->port_by_name);
         wx->next_expiration = time_msec() + 1000;
+        tag_set_init(&wx->revalidate_set);
 
         wx_port_init(wx);
 
+        wx->ofhooks = &default_ofhooks;
+        wx->aux = wx;
+        wx->ml = mac_learning_create();
+
+        list_init(&wx->ctl_packets);
+
         *wdpp = &wx->wdp;
     }
 
@@ -1241,6 +1389,9 @@ wx_close(struct wdp *wdp)
     classifier_destroy(&wx->cls);
     netdev_monitor_destroy(wx->netdev_monitor);
     list_remove(&wx->list_node);
+    mac_learning_destroy(wx->ml);
+    hmap_destroy(&wx->ports);
+    shash_destroy(&wx->port_by_name);
     free(wx);
 }
 
@@ -1260,26 +1411,13 @@ wx_destroy(struct wdp *wdp)
     return xfif_delete(wx->xfif);
 }
 
-static void
-hton_ofp_phy_port(struct ofp_phy_port *opp)
-{
-    opp->port_no = htons(opp->port_no);
-    opp->config = htonl(opp->config);
-    opp->state = htonl(opp->state);
-    opp->curr = htonl(opp->curr);
-    opp->advertised = htonl(opp->advertised);
-    opp->supported = htonl(opp->supported);
-    opp->peer = htonl(opp->peer);
-}
-
 static int
 wx_get_features(const struct wdp *wdp, struct ofpbuf **featuresp)
 {
     struct wx *wx = wx_cast(wdp);
     struct ofp_switch_features *osf;
     struct ofpbuf *buf;
-    unsigned int port_no;
-    struct wdp_port *port;
+    struct wx_port *port;
 
     buf = ofpbuf_new(sizeof *osf);
     osf = ofpbuf_put_zeros(buf, sizeof *osf);
@@ -1298,15 +1436,16 @@ wx_get_features(const struct wdp *wdp, struct ofpbuf **featuresp)
                          (1u << OFPAT_SET_TP_DST) |
                          (1u << OFPAT_ENQUEUE));
 
-    PORT_ARRAY_FOR_EACH (port, &wx->ports, port_no) {
-        hton_ofp_phy_port(ofpbuf_put(buf, &port->opp, sizeof port->opp));
+    HMAP_FOR_EACH (port, hmap_node, &wx->ports) {
+        const struct ofp_phy_port *opp = &port->wdp_port.opp;
+        hton_ofp_phy_port(ofpbuf_put(buf, opp, sizeof *opp));
     }
 
     *featuresp = buf;
     return 0;
 }
 
-static void
+static int
 count_subrules(struct cls_rule *cls_rule, void *n_subrules_)
 {
     struct wx_rule *rule = wx_rule_cast(cls_rule);
@@ -1315,6 +1454,7 @@ count_subrules(struct cls_rule *cls_rule, void *n_subrules_)
     if (rule->super) {
         (*n_subrules)++;
     }
+    return 0;
 }
 
 static int
@@ -1322,38 +1462,47 @@ wx_get_stats(const struct wdp *wdp, struct wdp_stats *stats)
 {
     struct wx *wx = wx_cast(wdp);
     struct xflow_stats xflow_stats;
-    int n_subrules;
     int error;
 
     error = xfif_get_xf_stats(wx->xfif, &xflow_stats);
+    stats->max_ports = xflow_stats.max_ports;
+    return error;
+}
 
-    n_subrules = 0;
-    classifier_for_each(&wx->cls, CLS_INC_EXACT, count_subrules, &n_subrules);
+static int
+wx_get_table_stats(const struct wdp *wdp, struct ofpbuf *stats)
+{
+    struct wx *wx = wx_cast(wdp);
+    struct xflow_stats xflow_stats;
+    struct ofp_table_stats *exact, *wild;
+    int n_subrules;
 
-    stats->exact.n_flows = classifier_count_exact(&wx->cls) - n_subrules;
-    stats->exact.cur_capacity = xflow_stats.cur_capacity;
-    stats->exact.max_capacity = MIN(WX_MAX_EXACT, xflow_stats.max_capacity);
-    stats->exact.n_hit = xflow_stats.n_hit;
-    stats->exact.n_missed = xflow_stats.n_missed;
-    stats->exact.n_lost = xflow_stats.n_lost;
-
-    stats->wild.n_flows = classifier_count_wild(&wx->cls);
-    stats->wild.cur_capacity = WX_MAX_WILD;
-    stats->wild.max_capacity = WX_MAX_WILD;
-    stats->wild.n_hit = 0;      /* XXX */
-    stats->wild.n_missed = 0;   /* XXX */
-    stats->wild.n_lost = 0;     /* XXX */
-
-    stats->n_ports = xflow_stats.n_ports;
-    stats->max_ports = xflow_stats.max_ports;
+    xfif_get_xf_stats(wx->xfif, &xflow_stats);
+    /* XXX should pass up errors, but there are no appropriate OpenFlow error
+     * codes. */
 
-    stats->n_frags = xflow_stats.n_frags;
+    n_subrules = 0;
+    classifier_for_each(&wx->cls, CLS_INC_EXACT, count_subrules, &n_subrules);
 
-    stats->max_miss_queue = xflow_stats.max_miss_queue;
-    stats->max_action_queue = xflow_stats.max_action_queue;
-    stats->max_sflow_queue = xflow_stats.max_sflow_queue;
+    exact = ofpbuf_put_zeros(stats, sizeof *exact);
+    exact->table_id = TABLEID_HASH;
+    strcpy(exact->name, "exact");
+    exact->wildcards = htonl(0);
+    exact->max_entries = htonl(MIN(WX_MAX_EXACT, xflow_stats.max_capacity));
+    exact->active_count = htonl(classifier_count_exact(&wx->cls) - n_subrules);
+    exact->lookup_count = htonll(xflow_stats.n_hit + xflow_stats.n_missed);
+    exact->matched_count = htonll(xflow_stats.n_hit);
+
+    wild = ofpbuf_put_zeros(stats, sizeof *exact);
+    wild->table_id = TABLEID_CLASSIFIER;
+    strcpy(wild->name, "classifier");
+    wild->wildcards = htonl(OVSFW_ALL);
+    wild->max_entries = htonl(WX_MAX_WILD);
+    wild->active_count = htonl(classifier_count_wild(&wx->cls));
+    wild->lookup_count = htonll(0);  /* XXX */
+    wild->matched_count = htonll(0); /* XXX */
 
-    return error;
+    return 0;
 }
 
 static int
@@ -1390,10 +1539,10 @@ wx_port_del(struct wdp *wdp, uint16_t port_no)
 }
 
 static int
-wx_answer_port_query(const struct wdp_port *port, struct wdp_port *portp)
+wx_answer_port_query(const struct wx_port *port, struct wdp_port *portp)
 {
     if (port) {
-        wdp_port_copy(portp, port);
+        wdp_port_copy(portp, &port->wdp_port);
         return 0;
     } else {
         return ENOENT;
@@ -1405,10 +1554,9 @@ wx_port_query_by_number(const struct wdp *wdp, uint16_t port_no,
                         struct wdp_port *portp)
 {
     struct wx *wx = wx_cast(wdp);
-    const struct wdp_port *port;
+    struct wx_port *wx_port = wx_port_get(wx, ofp_port_to_xflow_port(port_no));
 
-    port = port_array_get(&wx->ports, ofp_port_to_xflow_port(port_no));
-    return wx_answer_port_query(port, portp);
+    return wx_answer_port_query(wx_port, portp);
 }
 
 static int
@@ -1425,42 +1573,46 @@ static int
 wx_port_set_config(struct wdp *wdp, uint16_t port_no, uint32_t config)
 {
     struct wx *wx = wx_cast(wdp);
-    struct wdp_port *port;
+    struct wx_port *port;
+    struct ofp_phy_port *opp;
     uint32_t changes;
 
-    port = port_array_get(&wx->ports, ofp_port_to_xflow_port(port_no));
+    port = wx_port_get(wx, ofp_port_to_xflow_port(port_no));
     if (!port) {
         return ENOENT;
     }
-    changes = config ^ port->opp.config;
+    opp = &port->wdp_port.opp;
+    changes = config ^ opp->config;
 
     if (changes & OFPPC_PORT_DOWN) {
+        struct netdev *netdev = port->wdp_port.netdev;
         int error;
+
         if (config & OFPPC_PORT_DOWN) {
-            error = netdev_turn_flags_off(port->netdev, NETDEV_UP, true);
+            error = netdev_turn_flags_off(netdev, NETDEV_UP, true);
         } else {
-            error = netdev_turn_flags_on(port->netdev, NETDEV_UP, true);
+            error = netdev_turn_flags_on(netdev, NETDEV_UP, true);
         }
         if (!error) {
-            port->opp.config ^= OFPPC_PORT_DOWN;
+            opp->config ^= OFPPC_PORT_DOWN;
         }
     }
 
 #define REVALIDATE_BITS (OFPPC_NO_RECV | OFPPC_NO_RECV_STP | OFPPC_NO_FWD)
     if (changes & REVALIDATE_BITS) {
         COVERAGE_INC(wx_costly_flags);
-        port->opp.config ^= changes & REVALIDATE_BITS;
+        opp->config ^= changes & REVALIDATE_BITS;
         wx->need_revalidate = true;
     }
 #undef REVALIDATE_BITS
 
     if (changes & OFPPC_NO_FLOOD) {
-        port->opp.config ^= OFPPC_NO_FLOOD;
+        opp->config ^= OFPPC_NO_FLOOD;
         wx_port_refresh_groups(wx);
     }
 
     if (changes & OFPPC_NO_PACKET_IN) {
-        port->opp.config ^= OFPPC_NO_PACKET_IN;
+        opp->config ^= OFPPC_NO_PACKET_IN;
     }
 
     return 0;
@@ -1470,15 +1622,15 @@ static int
 wx_port_list(const struct wdp *wdp, struct wdp_port **portsp, size_t *n_portsp)
 {
     struct wx *wx = wx_cast(wdp);
-    struct wdp_port *ports, *port;
-    unsigned int port_no;
+    struct wdp_port *ports;
+    struct wx_port *port;
     size_t n_ports, i;
 
-    *n_portsp = n_ports = port_array_count(&wx->ports);
+    *n_portsp = n_ports = hmap_count(&wx->ports);
     *portsp = ports = xmalloc(n_ports * sizeof *ports);
     i = 0;
-    PORT_ARRAY_FOR_EACH (port, &wx->ports, port_no) {
-        wdp_port_copy(&ports[i++], port);
+    HMAP_FOR_EACH (port, hmap_node, &wx->ports) {
+        wdp_port_copy(&ports[i++], &port->wdp_port);
     }
     assert(i == n_ports);
 
@@ -1486,26 +1638,51 @@ wx_port_list(const struct wdp *wdp, struct wdp_port **portsp, size_t *n_portsp)
 }
 
 static int
-wx_port_poll(const struct wdp *wdp, char **devnamep)
+wx_port_poll(struct wdp *wdp, wdp_port_poll_cb_func *cb, void *aux)
 {
     struct wx *wx = wx_cast(wdp);
+    char *devname;
+    int retval;
+    int error;
 
-    return xfif_port_poll(wx->xfif, devnamep);
+    retval = 0;
+    while ((error = xfif_port_poll(wx->xfif, &devname)) != EAGAIN) {
+        wx_port_process_change(wx, error, devname, cb, aux);
+        if (error && error != ENOBUFS) {
+            retval = error;
+        }
+    }
+    while ((error = netdev_monitor_poll(wx->netdev_monitor,
+                                        &devname)) != EAGAIN) {
+        wx_port_process_change(wx, error, devname, cb, aux);
+        if (error && error != ENOBUFS) {
+            retval = error;
+        }
+    }
+    return retval;
 }
 
-static void
+static int
 wx_port_poll_wait(const struct wdp *wdp)
 {
     struct wx *wx = wx_cast(wdp);
 
     xfif_port_poll_wait(wx->xfif);
+    netdev_monitor_poll_wait(wx->netdev_monitor);
+    return 0;
 }
 
 static struct wdp_rule *
-wx_flow_get(const struct wdp *wdp, const flow_t *flow)
+wx_flow_get(const struct wdp *wdp, const flow_t *flow, unsigned int include)
 {
     struct wx *wx = wx_cast(wdp);
     struct wx_rule *rule;
+    int table_id;
+
+    table_id = flow->wildcards ? TABLEID_CLASSIFIER : TABLEID_HASH;
+    if (!(include & (1u << table_id))) {
+        return NULL;
+    }
 
     rule = wx_rule_cast(classifier_find_rule_exactly(&wx->cls, flow));
     return rule && !wx_rule_is_hidden(rule) ? &rule->wr : NULL;
@@ -1533,29 +1710,39 @@ struct wx_for_each_thunk_aux {
     void *client_aux;
 };
 
-static void
+static int
 wx_for_each_thunk(struct cls_rule *cls_rule, void *aux_)
 {
     struct wx_for_each_thunk_aux *aux = aux_;
     struct wx_rule *rule = wx_rule_cast(cls_rule);
 
     if (!wx_rule_is_hidden(rule)) {
-        aux->client_callback(&rule->wr, aux->client_aux);
+        return aux->client_callback(&rule->wr, aux->client_aux);
     }
+    return 0;
 }
 
-static void
+static int
 wx_flow_for_each_match(const struct wdp *wdp, const flow_t *target,
-                       int include,
+                       unsigned int include,
                        wdp_flow_cb_func *client_callback, void *client_aux)
 {
     struct wx *wx = wx_cast(wdp);
     struct wx_for_each_thunk_aux aux;
+    int cls_include;
+
+    cls_include = 0;
+    if (include & (1u << TABLEID_HASH)) {
+        cls_include |= CLS_INC_EXACT;
+    }
+    if (include & (1u << TABLEID_CLASSIFIER)) {
+        cls_include |= CLS_INC_WILD;
+    }
 
     aux.client_callback = client_callback;
     aux.client_aux = client_aux;
-    classifier_for_each_match(&wx->cls, target, include,
-                              wx_for_each_thunk, &aux);
+    return classifier_for_each_match(&wx->cls, target, cls_include,
+                                     wx_for_each_thunk, &aux);
 }
 
 /* Obtains statistic counters for 'rule' within 'wx' and stores them into
@@ -1589,7 +1776,7 @@ query_stats(struct wx *wx, struct wx_rule *rule, struct wdp_flow_stats *stats)
     xflow_flows = xzalloc(n_xflow_flows * sizeof *xflow_flows);
     if (rule->wr.cr.flow.wildcards) {
         size_t i = 0;
-        LIST_FOR_EACH (subrule, struct wx_rule, list, &rule->list) {
+        LIST_FOR_EACH (subrule, list, &rule->list) {
             xflow_key_from_flow(&xflow_flows[i++].key, &subrule->wr.cr.flow);
             stats->n_packets += subrule->packet_count;
             stats->n_bytes += subrule->byte_count;
@@ -1610,10 +1797,6 @@ query_stats(struct wx *wx, struct wx_rule *rule, struct wdp_flow_stats *stats)
             used = xflow_flow_stats_to_msec(&xflow_flow->stats);
             if (used > stats->used) {
                 stats->used = used;
-                if (xflow_flow->key.dl_type == htons(ETH_TYPE_IP)
-                    && xflow_flow->key.nw_proto == IP_TYPE_TCP) {
-                    stats->ip_tos = xflow_flow->stats.ip_tos;
-                }
             }
             stats->tcp_flags |= xflow_flow->stats.tcp_flags;
         }
@@ -1648,6 +1831,12 @@ wx_flow_put(struct wdp *wdp, const struct wdp_flow_put *put,
 {
     struct wx *wx = wx_cast(wdp);
     struct wx_rule *rule;
+    uint8_t ofp_table_id;
+
+    ofp_table_id = put->flow->wildcards ? TABLEID_CLASSIFIER : TABLEID_HASH;
+    if (put->ofp_table_id != 0xff && put->ofp_table_id != ofp_table_id) {
+        return ofp_mkerr_nicira(OFPET_FLOW_MOD_FAILED, NXFMFC_BAD_TABLE_ID);
+    }
 
     rule = wx_rule_cast(classifier_find_rule_exactly(&wx->cls, put->flow));
     if (rule && wx_rule_is_hidden(rule)) {
@@ -1666,13 +1855,14 @@ wx_flow_put(struct wdp *wdp, const struct wdp_flow_put *put,
              ? classifier_count_wild(&wx->cls) >= WX_MAX_WILD
              : classifier_count_exact(&wx->cls) >= WX_MAX_EXACT)) {
             /* XXX subrules should not count against exact-match limit */
-            return ENOBUFS;
+            return ofp_mkerr(OFPET_FLOW_MOD_FAILED, OFPFMFC_ALL_TABLES_FULL);
         }
     }
 
     rule = wx_rule_create(NULL, put->actions, put->n_actions,
                           put->idle_timeout, put->hard_timeout);
     cls_rule_from_flow(put->flow, &rule->wr.cr);
+    rule->wr.ofp_table_id = ofp_table_id;
     wx_rule_insert(wx, rule, NULL, 0);
 
     if (old_stats) {
@@ -1700,7 +1890,7 @@ wx_flow_delete(struct wdp *wdp, struct wdp_rule *wdp_rule,
     return 0;
 }
 
-static void
+static int
 wx_flush_rule(struct cls_rule *cls_rule, void *wx_)
 {
     struct wx_rule *rule = wx_rule_cast(cls_rule);
@@ -1713,6 +1903,8 @@ wx_flush_rule(struct cls_rule *cls_rule, void *wx_)
     rule->installed = false;
 
     wx_rule_remove(wx, rule);
+
+    return 0;
 }
 
 static int
@@ -1738,13 +1930,13 @@ wx_execute(struct wdp *wdp, uint16_t in_port,
 
     flow_extract((struct ofpbuf *) packet, 0, in_port, &flow);
     error = wx_xlate_actions(wx, actions, n_actions, &flow, packet,
-                             &xflow_actions, NULL);
+                             NULL, &xflow_actions, NULL);
     if (error) {
         return error;
     }
-    xfif_execute(wx->xfif, ofp_port_to_xflow_port(in_port),
-                 xflow_actions.actions, xflow_actions.n_actions, packet);
-    return 0;
+    return xfif_execute(wx->xfif, ofp_port_to_xflow_port(in_port),
+                        xflow_actions.actions, xflow_actions.n_actions,
+                        packet);
 }
 
 static int
@@ -1793,12 +1985,16 @@ wx_recv_set_mask(struct wdp *wdp, int listen_mask)
     struct wx *wx = wx_cast(wdp);
     int xflow_listen_mask;
 
+    wx->wdp_listen_mask = listen_mask;
+
     xflow_listen_mask = 0;
     if (listen_mask & (1 << WDP_CHAN_MISS)) {
         xflow_listen_mask |= XFLOWL_MISS;
     }
     if (listen_mask & (1 << WDP_CHAN_ACTION)) {
         xflow_listen_mask |= XFLOWL_ACTION;
+    } else {
+        wx_purge_ctl_packets__(wx);
     }
     if (listen_mask & (1 << WDP_CHAN_SFLOW)) {
         xflow_listen_mask |= XFLOWL_SFLOW;
@@ -1860,8 +2056,8 @@ wx_translate_xflow_msg(struct xflow_msg *msg, struct ofpbuf *payload,
 static const uint8_t *
 get_local_mac(const struct wx *wx)
 {
-    const struct wdp_port *port = port_array_get(&wx->ports, XFLOWP_LOCAL);
-    return port ? port->opp.hw_addr : NULL;
+    const struct wx_port *port = wx_port_get(wx, XFLOWP_LOCAL);
+    return port ? port->wdp_port.opp.hw_addr : NULL;
 }
 
 /* Returns true if 'packet' is a DHCP reply to the local port.  Such a reply
@@ -1887,13 +2083,18 @@ wx_is_local_dhcp_reply(const struct wx *wx,
     return false;
 }
 
+/* Determines whether 'payload' that arrived on 'in_port' is included in any of
+ * the flows in 'wx''s OpenFlow flow table.  If so, then it adds a
+ * corresponding flow to the xfif's exact-match flow table, taking ownership of
+ * 'payload', and returns true.  If not, it returns false and the caller
+ * retains ownership of 'payload'. */
 static bool
-wx_explode_rule(struct wx *wx, struct xflow_msg *msg, struct ofpbuf *payload)
+wx_explode_rule(struct wx *wx, uint16_t in_port, struct ofpbuf *payload)
 {
     struct wx_rule *rule;
     flow_t flow;
 
-    flow_extract(payload, 0, xflow_port_to_ofp_port(msg->port), &flow);
+    flow_extract(payload, 0, xflow_port_to_ofp_port(in_port), &flow);
 
     if (wx_is_local_dhcp_reply(wx, &flow, payload)) {
         union xflow_action action;
@@ -1901,7 +2102,7 @@ wx_explode_rule(struct wx *wx, struct xflow_msg *msg, struct ofpbuf *payload)
         memset(&action, 0, sizeof(action));
         action.output.type = XFLOWAT_OUTPUT;
         action.output.port = XFLOWP_LOCAL;
-        xfif_execute(wx->xfif, msg->port, &action, 1, payload);
+        xfif_execute(wx->xfif, in_port, &action, 1, payload);
     }
 
     rule = wx_rule_lookup_valid(wx, &flow);
@@ -1935,6 +2136,19 @@ wx_recv(struct wdp *wdp, struct wdp_packet *packet)
     struct wx *wx = wx_cast(wdp);
     int i;
 
+    if (wx->n_ctl_packets) {
+        struct wdp_packet *wdp_packet;
+
+        wdp_packet = CONTAINER_OF(list_pop_front(&wx->ctl_packets),
+                                  struct wdp_packet, list);
+        wx->n_ctl_packets--;
+
+        *packet = *wdp_packet;
+        free(wdp_packet);
+
+        return 0;
+    }
+
     /* XXX need to avoid 50*50 potential cost for caller. */
     for (i = 0; i < 50; i++) {
         struct xflow_msg *msg;
@@ -1947,48 +2161,137 @@ wx_recv(struct wdp *wdp, struct wdp_packet *packet)
         }
 
         msg = ofpbuf_pull(buf, sizeof *msg);
-        if (msg->type != _XFLOWL_MISS_NR || !wx_explode_rule(wx, msg, buf)) {
+        if (msg->type != _XFLOWL_MISS_NR
+            || !wx_explode_rule(wx, msg->port, buf)) {
             return wx_translate_xflow_msg(msg, buf, packet);
         }
-        ofpbuf_delete(buf);
     }
     return EAGAIN;
 }
 
 static void
-wx_recv_wait(struct wdp *wdp)
+wx_recv_purge_queue__(struct wx *wx, int max, int xflow_listen_mask,
+                      int *errorp)
 {
-    struct wx *wx = wx_cast(wdp);
+    int error;
 
-    xfif_recv_wait(wx->xfif);
+    error = xfif_recv_set_mask(wx->xfif, xflow_listen_mask);
+    if (!error) {
+        struct ofpbuf *buf;
+
+        while (max > 0 && (error = xfif_recv(wx->xfif, &buf)) == 0) {
+            ofpbuf_delete(buf);
+            max--;
+        }
+    }
+    if (error && error != EAGAIN) {
+        *errorp = error;
+    }
 }
-\f
-static void wx_port_update(struct wx *, const char *devname);
-static void wx_port_reinit(struct wx *);
 
 static void
-wx_port_process_change(struct wx *wx, int error, char *devname)
+wx_purge_ctl_packets__(struct wx *wx)
 {
-    if (error == ENOBUFS) {
-        wx_port_reinit(wx);
-    } else if (!error) {
-        wx_port_update(wx, devname);
-        free(devname);
+    struct wdp_packet *this, *next;
+
+    LIST_FOR_EACH_SAFE (this, next, list, &wx->ctl_packets) {
+        list_remove(&this->list);
+        ofpbuf_delete(this->payload);
+        free(this);
+    }
+    wx->n_ctl_packets = 0;
+}
+
+static int
+wx_recv_purge(struct wdp *wdp)
+{
+    struct wx *wx = wx_cast(wdp);
+    struct xflow_stats xflow_stats;
+    int xflow_listen_mask;
+    int retval, error;
+
+    xfif_get_xf_stats(wx->xfif, &xflow_stats);
+
+    error = xfif_recv_get_mask(wx->xfif, &xflow_listen_mask);
+    if (error || !(xflow_listen_mask & XFLOWL_ALL)) {
+        return error;
+    }
+
+    if (xflow_listen_mask & XFLOWL_MISS) {
+        wx_recv_purge_queue__(wx, xflow_stats.max_miss_queue, XFLOWL_MISS,
+                              &error);
+    }
+    if (xflow_listen_mask & XFLOWL_ACTION) {
+        wx_recv_purge_queue__(wx, xflow_stats.max_action_queue, XFLOWL_ACTION,
+                              &error);
+        wx_purge_ctl_packets__(wx);
     }
+    if (xflow_listen_mask & XFLOWL_SFLOW) {
+        wx_recv_purge_queue__(wx, xflow_stats.max_sflow_queue, XFLOWL_SFLOW,
+                              &error);
+    }
+
+    retval = xfif_recv_set_mask(wx->xfif, xflow_listen_mask);
+    return retval ? retval : error;
 }
 
+
 static void
-wx_port_run(struct wx *wx)
+wx_recv_wait(struct wdp *wdp)
 {
-    char *devname;
-    int error;
+    struct wx *wx = wx_cast(wdp);
 
-    while ((error = xfif_port_poll(wx->xfif, &devname)) != EAGAIN) {
-        wx_port_process_change(wx, error, devname);
+    if (wx->n_ctl_packets) {
+        poll_immediate_wake();
+    } else {
+        xfif_recv_wait(wx->xfif);
     }
-    while ((error = netdev_monitor_poll(wx->netdev_monitor,
-                                        &devname)) != EAGAIN) {
-        wx_port_process_change(wx, error, devname);
+}
+
+static int
+wx_set_ofhooks(struct wdp *wdp, const struct ofhooks *ofhooks, void *aux)
+{
+    struct wx *wx = wx_cast(wdp);
+
+    if (wx->ofhooks == &default_ofhooks) {
+        mac_learning_destroy(wx->ml);
+        wx->ml = NULL;
+    }
+
+    wx->ofhooks = ofhooks;
+    wx->aux = aux;
+    return 0;
+}
+
+static void
+wx_revalidate(struct wdp *wdp, tag_type tag)
+{
+    struct wx *wx = wx_cast(wdp);
+
+    tag_set_add(&wx->revalidate_set, tag);
+}
+
+static void
+wx_revalidate_all(struct wdp *wdp)
+{
+    struct wx *wx = wx_cast(wdp);
+
+    wx->revalidate_all = true;
+}
+\f
+static void wx_port_update(struct wx *, const char *devname,
+                           wdp_port_poll_cb_func *cb, void *aux);
+static void wx_port_reinit(struct wx *, wdp_port_poll_cb_func *cb, void *aux);
+
+static void
+wx_port_process_change(struct wx *wx, int error, char *devname,
+                       wdp_port_poll_cb_func *cb, void *aux)
+{
+    if (error == ENOBUFS) {
+        wx_port_reinit(wx, cb, aux);
+    } else if (!error) {
+        wx_port_update(wx, devname, cb, aux);
+        free(devname);
     }
 }
 
@@ -1997,16 +2300,16 @@ wx_port_refresh_group(struct wx *wx, unsigned int group)
 {
     uint16_t *ports;
     size_t n_ports;
-    struct wdp_port *port;
-    unsigned int port_no;
+    struct wx_port *port;
 
     assert(group == WX_GROUP_ALL || group == WX_GROUP_FLOOD);
 
-    ports = xmalloc(port_array_count(&wx->ports) * sizeof *ports);
+    ports = xmalloc(hmap_count(&wx->ports) * sizeof *ports);
     n_ports = 0;
-    PORT_ARRAY_FOR_EACH (port, &wx->ports, port_no) {
-        if (group == WX_GROUP_ALL || !(port->opp.config & OFPPC_NO_FLOOD)) {
-            ports[n_ports++] = port_no;
+    HMAP_FOR_EACH (port, hmap_node, &wx->ports) {
+        const struct ofp_phy_port *opp = &port->wdp_port.opp;
+        if (group == WX_GROUP_ALL || !(opp->config & OFPPC_NO_FLOOD)) {
+            ports[n_ports++] = port->xflow_port;
         }
     }
     xfif_port_group_set(wx->xfif, group, ports, n_ports);
@@ -2023,18 +2326,17 @@ wx_port_refresh_groups(struct wx *wx)
 }
 
 static void
-wx_port_reinit(struct wx *wx)
+wx_port_reinit(struct wx *wx, wdp_port_poll_cb_func *cb, void *aux)
 {
     struct svec devnames;
-    struct wdp_port *wdp_port;
-    unsigned int port_no;
+    struct wx_port *wx_port;
     struct xflow_port *xflow_ports;
     size_t n_xflow_ports;
     size_t i;
 
     svec_init(&devnames);
-    PORT_ARRAY_FOR_EACH (wdp_port, &wx->ports, port_no) {
-        svec_add (&devnames, (char *) wdp_port->opp.name);
+    HMAP_FOR_EACH (wx_port, hmap_node, &wx->ports) {
+        svec_add (&devnames, (char *) wx_port->wdp_port.opp.name);
     }
     xfif_port_list(wx->xfif, &xflow_ports, &n_xflow_ports);
     for (i = 0; i < n_xflow_ports; i++) {
@@ -2044,18 +2346,19 @@ wx_port_reinit(struct wx *wx)
 
     svec_sort_unique(&devnames);
     for (i = 0; i < devnames.n; i++) {
-        wx_port_update(wx, devnames.names[i]);
+        wx_port_update(wx, devnames.names[i], cb, aux);
     }
     svec_destroy(&devnames);
 
     wx_port_refresh_groups(wx);
 }
 
-static struct wdp_port *
-make_wdp_port(const struct xflow_port *xflow_port)
+static struct wx_port *
+make_wx_port(const struct xflow_port *xflow_port)
 {
     struct netdev_options netdev_options;
     enum netdev_flags flags;
+    struct wx_port *wx_port;
     struct wdp_port *wdp_port;
     struct netdev *netdev;
     bool carrier;
@@ -2074,7 +2377,9 @@ make_wdp_port(const struct xflow_port *xflow_port)
         return NULL;
     }
 
-    wdp_port = xmalloc(sizeof *wdp_port);
+    wx_port = xmalloc(sizeof *wx_port);
+    wx_port->xflow_port = xflow_port->port;
+    wdp_port = &wx_port->wdp_port;
     wdp_port->netdev = netdev;
     wdp_port->opp.port_no = xflow_port_to_ofp_port(xflow_port->port);
     netdev_get_etheraddr(netdev, wdp_port->opp.hw_addr);
@@ -2094,13 +2399,13 @@ make_wdp_port(const struct xflow_port *xflow_port)
 
     wdp_port->devname = xstrdup(xflow_port->devname);
     wdp_port->internal = (xflow_port->flags & XFLOW_PORT_INTERNAL) != 0;
-    return wdp_port;
+    return wx_port;
 }
 
 static bool
 wx_port_conflicts(const struct wx *wx, const struct xflow_port *xflow_port)
 {
-    if (port_array_get(&wx->ports, xflow_port->port)) {
+    if (wx_port_get(wx, xflow_port->port)) {
         VLOG_WARN_RL(&rl, "ignoring duplicate port %"PRIu16" in datapath",
                      xflow_port->port);
         return true;
@@ -2114,10 +2419,10 @@ wx_port_conflicts(const struct wx *wx, const struct xflow_port *xflow_port)
 }
 
 static int
-wdp_port_equal(const struct wdp_port *a_, const struct wdp_port *b_)
+wx_port_equal(const struct wx_port *a_, const struct wx_port *b_)
 {
-    const struct ofp_phy_port *a = &a_->opp;
-    const struct ofp_phy_port *b = &b_->opp;
+    const struct ofp_phy_port *a = &a_->wdp_port.opp;
+    const struct ofp_phy_port *b = &b_->wdp_port.opp;
 
     BUILD_ASSERT_DECL(sizeof *a == 48); /* Detect ofp_phy_port changes. */
     return (a->port_no == b->port_no
@@ -2132,42 +2437,44 @@ wdp_port_equal(const struct wdp_port *a_, const struct wdp_port *b_)
 }
 
 static void
-wx_port_install(struct wx *wx, struct wdp_port *wdp_port)
+wx_port_install(struct wx *wx, struct wx_port *wx_port)
 {
-    uint16_t xflow_port = ofp_port_to_xflow_port(wdp_port->opp.port_no);
-    const char *netdev_name = (const char *) wdp_port->opp.name;
+    const struct ofp_phy_port *opp = &wx_port->wdp_port.opp;
+    uint16_t xflow_port = ofp_port_to_xflow_port(opp->port_no);
+    const char *name = (const char *) opp->name;
 
-    netdev_monitor_add(wx->netdev_monitor, wdp_port->netdev);
-    port_array_set(&wx->ports, xflow_port, wdp_port);
-    shash_add(&wx->port_by_name, netdev_name, wdp_port);
+    netdev_monitor_add(wx->netdev_monitor, wx_port->wdp_port.netdev);
+    hmap_insert(&wx->ports, &wx_port->hmap_node, hash_int(xflow_port, 0));
+    shash_add(&wx->port_by_name, name, wx_port);
 }
 
 static void
-wx_port_remove(struct wx *wx, struct wdp_port *wdp_port)
+wx_port_remove(struct wx *wx, struct wx_port *wx_port)
 {
-    uint16_t xflow_port = ofp_port_to_xflow_port(wdp_port->opp.port_no);
+    const struct ofp_phy_port *opp = &wx_port->wdp_port.opp;
+    const char *name = (const char *) opp->name;
 
-    netdev_monitor_remove(wx->netdev_monitor, wdp_port->netdev);
-    port_array_delete(&wx->ports, xflow_port);
-    shash_delete(&wx->port_by_name,
-                 shash_find(&wx->port_by_name, (char *) wdp_port->opp.name));
+    netdev_monitor_remove(wx->netdev_monitor, wx_port->wdp_port.netdev);
+    hmap_remove(&wx->ports, &wx_port->hmap_node);
+    shash_delete(&wx->port_by_name, shash_find(&wx->port_by_name, name));
 }
 
 static void
-wx_port_free(struct wdp_port *wdp_port)
+wx_port_free(struct wx_port *wx_port)
 {
-    if (wdp_port) {
-        netdev_close(wdp_port->netdev);
-        free(wdp_port);
+    if (wx_port) {
+        wdp_port_free(&wx_port->wdp_port);
+        free(wx_port);
     }
 }
 
 static void
-wx_port_update(struct wx *wx, const char *devname)
+wx_port_update(struct wx *wx, const char *devname,
+               wdp_port_poll_cb_func *cb, void *aux)
 {
     struct xflow_port xflow_port;
-    struct wdp_port *old_wdp_port;
-    struct wdp_port *new_wdp_port;
+    struct wx_port *old_wx_port;
+    struct wx_port *new_wx_port;
     int error;
 
     COVERAGE_INC(wx_update_port);
@@ -2175,10 +2482,10 @@ wx_port_update(struct wx *wx, const char *devname)
     /* Query the datapath for port information. */
     error = xfif_port_query_by_name(wx->xfif, devname, &xflow_port);
 
-    /* Find the old wdp_port. */
-    old_wdp_port = shash_find_data(&wx->port_by_name, devname);
+    /* Find the old wx_port. */
+    old_wx_port = shash_find_data(&wx->port_by_name, devname);
     if (!error) {
-        if (!old_wdp_port) {
+        if (!old_wx_port) {
             /* There's no port named 'devname' but there might be a port with
              * the same port number.  This could happen if a port is deleted
              * and then a new one added in its place very quickly, or if a port
@@ -2189,7 +2496,7 @@ wx_port_update(struct wx *wx, const char *devname)
              * reliably but more portably by comparing the old port's MAC
              * against the new port's MAC.  However, this code isn't that smart
              * and always sends an OFPPR_MODIFY (XXX). */
-            old_wdp_port = port_array_get(&wx->ports, xflow_port.port);
+            old_wx_port = wx_port_get(wx, xflow_port.port);
         }
     } else if (error != ENOENT && error != ENODEV) {
         VLOG_WARN_RL(&rl, "xfif_port_query_by_name returned unexpected error "
@@ -2197,37 +2504,50 @@ wx_port_update(struct wx *wx, const char *devname)
         return;
     }
 
-    /* Create a new wdp_port. */
-    new_wdp_port = !error ? make_wdp_port(&xflow_port) : NULL;
+    /* Create a new wx_port. */
+    new_wx_port = !error ? make_wx_port(&xflow_port) : NULL;
 
     /* Eliminate a few pathological cases. */
-    if (!old_wdp_port && !new_wdp_port) {
+    if (!old_wx_port && !new_wx_port) {
         return;
-    } else if (old_wdp_port && new_wdp_port) {
+    } else if (old_wx_port && new_wx_port) {
         /* Most of the 'config' bits are OpenFlow soft state, but
          * OFPPC_PORT_DOWN is maintained by the kernel.  So transfer the
-         * OpenFlow bits from old_wdp_port.  (make_wdp_port() only sets
+         * OpenFlow bits from old_wx_port.  (make_wx_port() only sets
          * OFPPC_PORT_DOWN and leaves the other bits 0.)  */
-        new_wdp_port->opp.config |= old_wdp_port->opp.config & ~OFPPC_PORT_DOWN;
+        struct ofp_phy_port *new_opp = &new_wx_port->wdp_port.opp;
+        struct ofp_phy_port *old_opp = &old_wx_port->wdp_port.opp;
+        new_opp->config |= old_opp->config & ~OFPPC_PORT_DOWN;
 
-        if (wdp_port_equal(old_wdp_port, new_wdp_port)) {
+        if (wx_port_equal(old_wx_port, new_wx_port)) {
             /* False alarm--no change. */
-            wx_port_free(new_wdp_port);
+            wx_port_free(new_wx_port);
             return;
         }
     }
 
     /* Now deal with the normal cases. */
-    if (old_wdp_port) {
-        wx_port_remove(wx, old_wdp_port);
+    if (old_wx_port) {
+        wx_port_remove(wx, old_wx_port);
     }
-    if (new_wdp_port) {
-        wx_port_install(wx, new_wdp_port);
+    if (new_wx_port) {
+        wx_port_install(wx, new_wx_port);
+    }
+
+    /* Call back. */
+    if (!old_wx_port) {
+        (*cb)(&new_wx_port->wdp_port.opp, OFPPR_ADD, aux);
+    } else if (!new_wx_port) {
+        (*cb)(&old_wx_port->wdp_port.opp, OFPPR_DELETE, aux);
+    } else {
+        (*cb)(&new_wx_port->wdp_port.opp, OFPPR_MODIFY, aux);
     }
-    wx_port_free(old_wdp_port);
 
     /* Update port groups. */
     wx_port_refresh_groups(wx);
+
+    /* Clean up. */
+    wx_port_free(old_wx_port);
 }
 
 static int
@@ -2246,9 +2566,9 @@ wx_port_init(struct wx *wx)
     for (i = 0; i < n_ports; i++) {
         const struct xflow_port *xflow_port = &ports[i];
         if (!wx_port_conflicts(wx, xflow_port)) {
-            struct wdp_port *wdp_port = make_wdp_port(xflow_port);
-            if (wdp_port) {
-                wx_port_install(wx, wdp_port);
+            struct wx_port *wx_port = make_wx_port(xflow_port);
+            if (wx_port) {
+                wx_port_install(wx, wx_port);
             }
         }
     }
@@ -2256,6 +2576,21 @@ wx_port_init(struct wx *wx)
     wx_port_refresh_groups(wx);
     return 0;
 }
+
+/* Returns the port in 'wx' with xflow port number 'xflow_port'. */
+static struct wx_port *
+wx_port_get(const struct wx *wx, uint16_t xflow_port)
+{
+    struct wx_port *port;
+
+    HMAP_FOR_EACH_IN_BUCKET (port, hmap_node, hash_int(xflow_port, 0),
+                             &wx->ports) {
+        if (port->xflow_port == xflow_port) {
+            return port;
+        }
+    }
+    return NULL;
+}
 \f
 void
 wdp_xflow_register(void)
@@ -2271,6 +2606,7 @@ wdp_xflow_register(void)
         wx_destroy,
         wx_get_features,
         wx_get_stats,
+        wx_get_table_stats,
         wx_get_drop_frags,
         wx_set_drop_frags,
         wx_port_add,
@@ -2296,7 +2632,11 @@ wdp_xflow_register(void)
         wx_get_sflow_probability,
         wx_set_sflow_probability,
         wx_recv,
+        wx_recv_purge,
         wx_recv_wait,
+        wx_set_ofhooks,
+        wx_revalidate,
+        wx_revalidate_all,
     };
 
     static bool inited = false;
@@ -2332,3 +2672,53 @@ wdp_xflow_register(void)
 
     svec_destroy(&types);
 }
+\f
+static bool
+default_normal_ofhook_cb(const flow_t *flow, const struct ofpbuf *packet,
+                         struct xflow_actions *actions, tag_type *tags,
+                         uint16_t *nf_output_iface, void *wx_)
+{
+    struct wx *wx = wx_;
+    int out_port;
+
+    /* Drop frames for reserved multicast addresses. */
+    if (eth_addr_is_reserved(flow->dl_dst)) {
+        return true;
+    }
+
+    /* Learn source MAC (but don't try to learn from revalidation). */
+    if (packet != NULL) {
+        tag_type rev_tag = mac_learning_learn(wx->ml, flow->dl_src,
+                                              0, flow->in_port,
+                                              GRAT_ARP_LOCK_NONE);
+        if (rev_tag) {
+            /* The log messages here could actually be useful in debugging,
+             * so keep the rate limit relatively high. */
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(30, 300);
+            VLOG_DBG_RL(&rl, "learned that "ETH_ADDR_FMT" is on port %"PRIu16,
+                        ETH_ADDR_ARGS(flow->dl_src), flow->in_port);
+            tag_set_add(&wx->revalidate_set, rev_tag);
+        }
+    }
+
+    /* Determine output port. */
+    out_port = mac_learning_lookup_tag(wx->ml, flow->dl_dst, 0, tags,
+                                       NULL);
+    if (out_port < 0) {
+        add_output_group_action(actions, WX_GROUP_FLOOD, nf_output_iface);
+    } else if (out_port != flow->in_port) {
+        xflow_actions_add(actions, XFLOWAT_OUTPUT)->output.port = out_port;
+        *nf_output_iface = out_port;
+    } else {
+        /* Drop. */
+    }
+
+    return true;
+}
+
+static const struct ofhooks default_ofhooks = {
+    NULL,
+    default_normal_ofhook_cb,
+    NULL,
+    NULL
+};