ofproto: Handle flow installation and eviction in upcall.
[sliver-openvswitch.git] / ofproto / ofproto-dpif-upcall.c
index f0088f0..78424fd 100644 (file)
@@ -31,6 +31,7 @@
 #include "ofpbuf.h"
 #include "ofproto-dpif-ipfix.h"
 #include "ofproto-dpif-sflow.h"
+#include "ofproto-dpif-xlate.h"
 #include "packets.h"
 #include "poll-loop.h"
 #include "seq.h"
 #include "vlog.h"
 
 #define MAX_QUEUE_LENGTH 512
+#define FLOW_MISS_MAX_BATCH 50
+#define REVALIDATE_MAX_BATCH 50
 
 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
 
-COVERAGE_DEFINE(drop_queue_overflow);
 COVERAGE_DEFINE(upcall_queue_overflow);
-COVERAGE_DEFINE(fmb_queue_overflow);
-COVERAGE_DEFINE(fmb_queue_revalidated);
 
 /* A thread that processes each upcall handed to it by the dispatcher thread,
- * forwards the upcall's packet, and then queues it to the main ofproto_dpif
- * to possibly set up a kernel flow as a cache. */
+ * forwards the upcall's packet, and possibly sets up a kernel flow as a
+ * cache. */
 struct handler {
     struct udpif *udpif;               /* Parent udpif. */
     pthread_t thread;                  /* Thread ID. */
@@ -66,13 +66,34 @@ struct handler {
                                           'mutex'. */
 };
 
+/* A thread that processes each kernel flow handed to it by the flow_dumper
+ * thread, updates OpenFlow statistics, and updates or removes the kernel flow
+ * as necessary. */
+struct revalidator {
+    struct udpif *udpif;               /* Parent udpif. */
+    char *name;                        /* Thread name. */
+
+    pthread_t thread;                  /* Thread ID. */
+    struct hmap ukeys;                 /* Datapath flow keys. */
+
+    uint64_t dump_seq;
+
+    struct ovs_mutex mutex;            /* Mutex guarding the following. */
+    pthread_cond_t wake_cond;
+    struct list udumps OVS_GUARDED;    /* Unprocessed udumps. */
+    size_t n_udumps OVS_GUARDED;       /* Number of unprocessed udumps. */
+};
+
 /* An upcall handler for ofproto_dpif.
  *
- * udpif is implemented as a "dispatcher" thread that reads upcalls from the
- * kernel.  It processes each upcall just enough to figure out its next
- * destination.  For a "miss" upcall (MISS_UPCALL), this is one of several
- * "handler" threads (see struct handler).  Other upcalls are queued to the
- * main ofproto_dpif. */
+ * udpif has two logically separate pieces:
+ *
+ *    - A "dispatcher" thread that reads upcalls from the kernel and dispatches
+ *      them to one of several "handler" threads (see struct handler).
+ *
+ *    - A "flow_dumper" thread that reads the kernel flow table and dispatches
+ *      flows to one of several "revalidator" threads (see struct
+ *      revalidator). */
 struct udpif {
     struct list list_node;             /* In all_udpifs list. */
 
@@ -82,18 +103,30 @@ struct udpif {
     uint32_t secret;                   /* Random seed for upcall hash. */
 
     pthread_t dispatcher;              /* Dispatcher thread ID. */
+    pthread_t flow_dumper;             /* Flow dumper thread ID. */
 
     struct handler *handlers;          /* Upcall handlers. */
     size_t n_handlers;
 
-    /* Queues to pass up to ofproto-dpif. */
-    struct guarded_list drop_keys; /* "struct drop key"s. */
-    struct guarded_list fmbs;      /* "struct flow_miss_batch"es. */
+    struct revalidator *revalidators;  /* Flow revalidators. */
+    size_t n_revalidators;
+
+    uint64_t last_reval_seq;           /* 'reval_seq' at last revalidation. */
+    struct seq *reval_seq;             /* Incremented to force revalidation. */
+
+    struct seq *dump_seq;              /* Increments each dump iteration. */
+
+    struct latch exit_latch;           /* Tells child threads to exit. */
+
+    long long int dump_duration;       /* Duration of the last flow dump. */
 
-    struct seq *wait_seq;
-    struct seq *reval_seq;
+    /* Datapath flow statistics. */
+    unsigned int max_n_flows;
+    unsigned int avg_n_flows;
 
-    struct latch exit_latch; /* Tells child threads to exit. */
+    /* Following fields are accessed and modified by different threads. */
+    atomic_llong max_idle;             /* Maximum datapath flow idle time. */
+    atomic_uint flow_limit;            /* Datapath flow hard limit. */
 };
 
 enum upcall_type {
@@ -114,18 +147,92 @@ struct upcall {
     uint64_t upcall_stub[512 / 8];  /* Buffer to reduce need for malloc(). */
 };
 
+/* 'udpif_key's are responsible for tracking the little bit of state udpif
+ * needs to do flow expiration which can't be pulled directly from the
+ * datapath.  They are owned, created by, maintained, and destroyed by a single
+ * revalidator making them easy to efficiently handle with multiple threads. */
+struct udpif_key {
+    struct hmap_node hmap_node;     /* In parent revalidator 'ukeys' map. */
+
+    struct nlattr *key;            /* Datapath flow key. */
+    size_t key_len;                /* Length of 'key'. */
+
+    struct dpif_flow_stats stats;  /* Stats at most recent flow dump. */
+    long long int created;         /* Estimation of creation time. */
+
+    bool mark;                     /* Used by mark and sweep GC algorithm. */
+
+    struct odputil_keybuf key_buf; /* Memory for 'key'. */
+};
+
+/* 'udpif_flow_dump's hold the state associated with one iteration in a flow
+ * dump operation.  This is created by the flow_dumper thread and handed to the
+ * appropriate revalidator thread to be processed. */
+struct udpif_flow_dump {
+    struct list list_node;
+
+    struct nlattr *key;            /* Datapath flow key. */
+    size_t key_len;                /* Length of 'key'. */
+    uint32_t key_hash;             /* Hash of 'key'. */
+
+    struct odputil_keybuf mask_buf;
+    struct nlattr *mask;           /* Datapath mask for 'key'. */
+    size_t mask_len;               /* Length of 'mask'. */
+
+    struct dpif_flow_stats stats;  /* Stats pulled from the datapath. */
+
+    bool need_revalidate;          /* Key needs revalidation? */
+
+    struct odputil_keybuf key_buf;
+};
+
+/* Flow miss batching.
+ *
+ * Some dpifs implement operations faster when you hand them off in a batch.
+ * To allow batching, "struct flow_miss" queues the dpif-related work needed
+ * for a given flow.  Each "struct flow_miss" corresponds to sending one or
+ * more packets, plus possibly installing the flow in the dpif. */
+struct flow_miss {
+    struct hmap_node hmap_node;
+    struct ofproto_dpif *ofproto;
+
+    struct flow flow;
+    enum odp_key_fitness key_fitness;
+    const struct nlattr *key;
+    size_t key_len;
+    enum dpif_upcall_type upcall_type;
+    struct dpif_flow_stats stats;
+    odp_port_t odp_in_port;
+
+    uint64_t slow_path_buf[128 / 8];
+    struct odputil_keybuf mask_buf;
+
+    struct xlate_out xout;
+};
+
 static void upcall_destroy(struct upcall *);
 
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
 
 static void recv_upcalls(struct udpif *);
-static void handle_upcalls(struct udpif *, struct list *upcalls);
-static void miss_destroy(struct flow_miss *);
+static void handle_upcalls(struct handler *handler, struct list *upcalls);
+static void *udpif_flow_dumper(void *);
 static void *udpif_dispatcher(void *);
 static void *udpif_upcall_handler(void *);
+static void *udpif_revalidator(void *);
+static uint64_t udpif_get_n_flows(const struct udpif *);
+static void revalidate_udumps(struct revalidator *, struct list *udumps);
+static void revalidator_sweep(struct revalidator *);
 static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
                                 const char *argv[], void *aux);
+static void upcall_unixctl_disable_megaflows(struct unixctl_conn *, int argc,
+                                             const char *argv[], void *aux);
+static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
+                                            const char *argv[], void *aux);
+static void ukey_delete(struct revalidator *, struct udpif_key *);
+
+static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
 
 struct udpif *
 udpif_create(struct dpif_backer *backer, struct dpif *dpif)
@@ -136,17 +243,21 @@ udpif_create(struct dpif_backer *backer, struct dpif *dpif)
     if (ovsthread_once_start(&once)) {
         unixctl_command_register("upcall/show", "", 0, 0, upcall_unixctl_show,
                                  NULL);
+        unixctl_command_register("upcall/disable-megaflows", "", 0, 0,
+                                 upcall_unixctl_disable_megaflows, NULL);
+        unixctl_command_register("upcall/enable-megaflows", "", 0, 0,
+                                 upcall_unixctl_enable_megaflows, NULL);
         ovsthread_once_done(&once);
     }
 
     udpif->dpif = dpif;
     udpif->backer = backer;
+    atomic_init(&udpif->max_idle, 5000);
+    atomic_init(&udpif->flow_limit, MIN(ofproto_flow_limit, 10000));
     udpif->secret = random_uint32();
-    udpif->wait_seq = seq_create();
     udpif->reval_seq = seq_create();
+    udpif->dump_seq = seq_create();
     latch_init(&udpif->exit_latch);
-    guarded_list_init(&udpif->drop_keys);
-    guarded_list_init(&udpif->fmbs);
     list_push_back(&all_udpifs, &udpif->list_node);
 
     return udpif;
@@ -155,62 +266,82 @@ udpif_create(struct dpif_backer *backer, struct dpif *dpif)
 void
 udpif_destroy(struct udpif *udpif)
 {
-    struct flow_miss_batch *fmb;
-    struct drop_key *drop_key;
+    udpif_set_threads(udpif, 0, 0);
+    udpif_flush();
 
-    udpif_set_threads(udpif, 0);
     list_remove(&udpif->list_node);
-
-    while ((drop_key = drop_key_next(udpif))) {
-        drop_key_destroy(drop_key);
-    }
-
-    while ((fmb = flow_miss_batch_next(udpif))) {
-        flow_miss_batch_destroy(fmb);
-    }
-
-    guarded_list_destroy(&udpif->drop_keys);
-    guarded_list_destroy(&udpif->fmbs);
     latch_destroy(&udpif->exit_latch);
-    seq_destroy(udpif->wait_seq);
     seq_destroy(udpif->reval_seq);
+    seq_destroy(udpif->dump_seq);
     free(udpif);
 }
 
 /* Tells 'udpif' how many threads it should use to handle upcalls.  Disables
- * all threads if 'n_handlers' is zero.  'udpif''s datapath handle must have
- * packet reception enabled before starting threads. */
+ * all threads if 'n_handlers' and 'n_revalidators' is zero.  'udpif''s
+ * datapath handle must have packet reception enabled before starting threads.
+ */
 void
-udpif_set_threads(struct udpif *udpif, size_t n_handlers)
+udpif_set_threads(struct udpif *udpif, size_t n_handlers,
+                  size_t n_revalidators)
 {
     /* Stop the old threads (if any). */
-    if (udpif->handlers && udpif->n_handlers != n_handlers) {
+    if (udpif->handlers &&
+        (udpif->n_handlers != n_handlers
+         || udpif->n_revalidators != n_revalidators)) {
         size_t i;
 
         latch_set(&udpif->exit_latch);
 
-        /* Wake the handlers so they can exit. */
         for (i = 0; i < udpif->n_handlers; i++) {
             struct handler *handler = &udpif->handlers[i];
 
             ovs_mutex_lock(&handler->mutex);
             xpthread_cond_signal(&handler->wake_cond);
             ovs_mutex_unlock(&handler->mutex);
+            xpthread_join(handler->thread, NULL);
+        }
+
+        for (i = 0; i < udpif->n_revalidators; i++) {
+            struct revalidator *revalidator = &udpif->revalidators[i];
+
+            ovs_mutex_lock(&revalidator->mutex);
+            xpthread_cond_signal(&revalidator->wake_cond);
+            ovs_mutex_unlock(&revalidator->mutex);
+            xpthread_join(revalidator->thread, NULL);
         }
 
+        xpthread_join(udpif->flow_dumper, NULL);
         xpthread_join(udpif->dispatcher, NULL);
+
+        for (i = 0; i < udpif->n_revalidators; i++) {
+            struct revalidator *revalidator = &udpif->revalidators[i];
+            struct udpif_flow_dump *udump, *next_udump;
+            struct udpif_key *ukey, *next_ukey;
+
+            LIST_FOR_EACH_SAFE (udump, next_udump, list_node,
+                                &revalidator->udumps) {
+                list_remove(&udump->list_node);
+                free(udump);
+            }
+
+            HMAP_FOR_EACH_SAFE (ukey, next_ukey, hmap_node,
+                                &revalidator->ukeys) {
+                ukey_delete(revalidator, ukey);
+            }
+            hmap_destroy(&revalidator->ukeys);
+            ovs_mutex_destroy(&revalidator->mutex);
+
+            free(revalidator->name);
+        }
+
         for (i = 0; i < udpif->n_handlers; i++) {
             struct handler *handler = &udpif->handlers[i];
             struct upcall *miss, *next;
 
-            xpthread_join(handler->thread, NULL);
-
-            ovs_mutex_lock(&handler->mutex);
             LIST_FOR_EACH_SAFE (miss, next, list_node, &handler->upcalls) {
                 list_remove(&miss->list_node);
                 upcall_destroy(miss);
             }
-            ovs_mutex_unlock(&handler->mutex);
             ovs_mutex_destroy(&handler->mutex);
 
             xpthread_cond_destroy(&handler->wake_cond);
@@ -218,6 +349,10 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers)
         }
         latch_poll(&udpif->exit_latch);
 
+        free(udpif->revalidators);
+        udpif->revalidators = NULL;
+        udpif->n_revalidators = 0;
+
         free(udpif->handlers);
         udpif->handlers = NULL;
         udpif->n_handlers = 0;
@@ -228,6 +363,8 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers)
         size_t i;
 
         udpif->n_handlers = n_handlers;
+        udpif->n_revalidators = n_revalidators;
+
         udpif->handlers = xzalloc(udpif->n_handlers * sizeof *udpif->handlers);
         for (i = 0; i < udpif->n_handlers; i++) {
             struct handler *handler = &udpif->handlers[i];
@@ -240,19 +377,22 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers)
             xpthread_create(&handler->thread, NULL, udpif_upcall_handler,
                             handler);
         }
-        xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, udpif);
-    }
-}
 
-void
-udpif_wait(struct udpif *udpif)
-{
-    uint64_t seq = seq_read(udpif->wait_seq);
-    if (!guarded_list_is_empty(&udpif->drop_keys) ||
-        !guarded_list_is_empty(&udpif->fmbs)) {
-        poll_immediate_wake();
-    } else {
-        seq_wait(udpif->wait_seq, seq);
+        udpif->revalidators = xzalloc(udpif->n_revalidators
+                                      * sizeof *udpif->revalidators);
+        for (i = 0; i < udpif->n_revalidators; i++) {
+            struct revalidator *revalidator = &udpif->revalidators[i];
+
+            revalidator->udpif = udpif;
+            list_init(&revalidator->udumps);
+            hmap_init(&revalidator->ukeys);
+            ovs_mutex_init(&revalidator->mutex);
+            xpthread_cond_init(&revalidator->wake_cond, NULL);
+            xpthread_create(&revalidator->thread, NULL, udpif_revalidator,
+                            revalidator);
+        }
+        xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, udpif);
+        xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper, udpif);
     }
 }
 
@@ -261,22 +401,16 @@ udpif_wait(struct udpif *udpif)
 void
 udpif_revalidate(struct udpif *udpif)
 {
-    struct flow_miss_batch *fmb, *next_fmb;
-    struct list fmbs;
-
-    /* Since we remove each miss on revalidation, their statistics won't be
-     * accounted to the appropriate 'facet's in the upper layer.  In most
-     * cases, this is alright because we've already pushed the stats to the
-     * relevant rules. */
     seq_change(udpif->reval_seq);
+}
 
-    guarded_list_pop_all(&udpif->fmbs, &fmbs);
-    LIST_FOR_EACH_SAFE (fmb, next_fmb, list_node, &fmbs) {
-        list_remove(&fmb->list_node);
-        flow_miss_batch_destroy(fmb);
-    }
-
-    udpif_drop_key_clear(udpif);
+/* Returns a seq which increments every time 'udpif' pulls stats from the
+ * datapath.  Callers can use this to get a sense of when might be a good time
+ * to do periodic work which relies on relatively up to date statistics. */
+struct seq *
+udpif_dump_seq(struct udpif *udpif)
+{
+    return udpif->dump_seq;
 }
 
 void
@@ -294,8 +428,32 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
         simap_increase(usage, "handler upcalls",  handler->n_upcalls);
         ovs_mutex_unlock(&handler->mutex);
     }
+
+    simap_increase(usage, "revalidators", udpif->n_revalidators);
+    for (i = 0; i < udpif->n_revalidators; i++) {
+        struct revalidator *revalidator = &udpif->revalidators[i];
+        ovs_mutex_lock(&revalidator->mutex);
+        simap_increase(usage, "revalidator dumps", revalidator->n_udumps);
+
+        /* XXX: This isn't technically thread safe because the revalidator
+         * ukeys maps isn't protected by a mutex since it's per thread. */
+        simap_increase(usage, "revalidator keys",
+                       hmap_count(&revalidator->ukeys));
+        ovs_mutex_unlock(&revalidator->mutex);
+    }
 }
 
+/* Removes all flows from all datapaths. */
+void
+udpif_flush(void)
+{
+    struct udpif *udpif;
+
+    LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+        dpif_flow_flush(udpif->dpif);
+    }
+}
+\f
 /* Destroys and deallocates 'upcall'. */
 static void
 upcall_destroy(struct upcall *upcall)
@@ -307,103 +465,148 @@ upcall_destroy(struct upcall *upcall)
     }
 }
 
-/* Retrieves the next batch of processed flow misses for 'udpif' to install.
- * The caller is responsible for destroying it with flow_miss_batch_destroy().
- */
-struct flow_miss_batch *
-flow_miss_batch_next(struct udpif *udpif)
+static uint64_t
+udpif_get_n_flows(const struct udpif *udpif)
 {
-    int i;
+    struct dpif_dp_stats stats;
 
-    for (i = 0; i < 50; i++) {
-        struct flow_miss_batch *next;
-        struct list *next_node;
-
-        next_node = guarded_list_pop_front(&udpif->fmbs);
-        if (!next_node) {
-            break;
-        }
+    dpif_get_dp_stats(udpif->dpif, &stats);
+    return stats.n_flows;
+}
 
-        next = CONTAINER_OF(next_node, struct flow_miss_batch, list_node);
-        if (next->reval_seq == seq_read(udpif->reval_seq)) {
-            return next;
-        }
+/* The dispatcher thread is responsible for receiving upcalls from the kernel,
+ * assigning them to a upcall_handler thread. */
+static void *
+udpif_dispatcher(void *arg)
+{
+    struct udpif *udpif = arg;
 
-        flow_miss_batch_destroy(next);
+    set_subprogram_name("dispatcher");
+    while (!latch_is_set(&udpif->exit_latch)) {
+        recv_upcalls(udpif);
+        dpif_recv_wait(udpif->dpif);
+        latch_wait(&udpif->exit_latch);
+        poll_block();
     }
 
     return NULL;
 }
 
-/* Destroys and deallocates 'fmb'. */
-void
-flow_miss_batch_destroy(struct flow_miss_batch *fmb)
+static void *
+udpif_flow_dumper(void *arg)
 {
-    struct flow_miss *miss, *next;
-    struct upcall *upcall, *next_upcall;
-
-    if (!fmb) {
-        return;
-    }
-
-    HMAP_FOR_EACH_SAFE (miss, next, hmap_node, &fmb->misses) {
-        hmap_remove(&fmb->misses, &miss->hmap_node);
-        miss_destroy(miss);
-    }
-
-    LIST_FOR_EACH_SAFE (upcall, next_upcall, list_node, &fmb->upcalls) {
-        list_remove(&upcall->list_node);
-        upcall_destroy(upcall);
-    }
+    struct udpif *udpif = arg;
 
-    hmap_destroy(&fmb->misses);
-    free(fmb);
-}
+    set_subprogram_name("flow_dumper");
+    while (!latch_is_set(&udpif->exit_latch)) {
+        const struct dpif_flow_stats *stats;
+        long long int start_time, duration;
+        const struct nlattr *key, *mask;
+        struct dpif_flow_dump dump;
+        size_t key_len, mask_len;
+        unsigned int flow_limit;
+        long long int max_idle;
+        bool need_revalidate;
+        uint64_t reval_seq;
+        size_t n_flows, i;
+
+        reval_seq = seq_read(udpif->reval_seq);
+        need_revalidate = udpif->last_reval_seq != reval_seq;
+        udpif->last_reval_seq = reval_seq;
+
+        n_flows = udpif_get_n_flows(udpif);
+        udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
+        udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
+
+        atomic_read(&udpif->flow_limit, &flow_limit);
+        if (n_flows < flow_limit / 8) {
+            max_idle = 5000;
+        } else if (n_flows < flow_limit / 4) {
+            max_idle = 2000;
+        } else if (n_flows < flow_limit / 2) {
+            max_idle = 1000;
+        } else {
+            max_idle = 500;
+        }
+        atomic_store(&udpif->max_idle, max_idle);
+
+        start_time = time_msec();
+        dpif_flow_dump_start(&dump, udpif->dpif);
+        while (dpif_flow_dump_next(&dump, &key, &key_len, &mask, &mask_len,
+                                   NULL, NULL, &stats)
+               && !latch_is_set(&udpif->exit_latch)) {
+            struct udpif_flow_dump *udump = xmalloc(sizeof *udump);
+            struct revalidator *revalidator;
+
+            udump->key_hash = hash_bytes(key, key_len, udpif->secret);
+            memcpy(&udump->key_buf, key, key_len);
+            udump->key = (struct nlattr *) &udump->key_buf;
+            udump->key_len = key_len;
+
+            memcpy(&udump->mask_buf, mask, mask_len);
+            udump->mask = (struct nlattr *) &udump->mask_buf;
+            udump->mask_len = mask_len;
+
+            udump->stats = *stats;
+            udump->need_revalidate = need_revalidate;
+
+            revalidator = &udpif->revalidators[udump->key_hash
+                % udpif->n_revalidators];
+
+            ovs_mutex_lock(&revalidator->mutex);
+            while (revalidator->n_udumps >= REVALIDATE_MAX_BATCH * 3
+                   && !latch_is_set(&udpif->exit_latch)) {
+                ovs_mutex_cond_wait(&revalidator->wake_cond,
+                                    &revalidator->mutex);
+            }
+            list_push_back(&revalidator->udumps, &udump->list_node);
+            revalidator->n_udumps++;
+            xpthread_cond_signal(&revalidator->wake_cond);
+            ovs_mutex_unlock(&revalidator->mutex);
+        }
+        dpif_flow_dump_done(&dump);
+
+        /* Let all the revalidators finish and garbage collect. */
+        seq_change(udpif->dump_seq);
+        for (i = 0; i < udpif->n_revalidators; i++) {
+            struct revalidator *revalidator = &udpif->revalidators[i];
+            ovs_mutex_lock(&revalidator->mutex);
+            xpthread_cond_signal(&revalidator->wake_cond);
+            ovs_mutex_unlock(&revalidator->mutex);
+        }
 
-/* Retrieves the next drop key which ofproto-dpif needs to process.  The caller
- * is responsible for destroying it with drop_key_destroy(). */
-struct drop_key *
-drop_key_next(struct udpif *udpif)
-{
-    struct list *next = guarded_list_pop_front(&udpif->drop_keys);
-    return next ? CONTAINER_OF(next, struct drop_key, list_node) : NULL;
-}
+        for (i = 0; i < udpif->n_revalidators; i++) {
+            struct revalidator *revalidator = &udpif->revalidators[i];
 
-/* Destroys and deallocates 'drop_key'. */
-void
-drop_key_destroy(struct drop_key *drop_key)
-{
-    if (drop_key) {
-        free(drop_key->key);
-        free(drop_key);
-    }
-}
+            ovs_mutex_lock(&revalidator->mutex);
+            while (revalidator->dump_seq != seq_read(udpif->dump_seq)
+                   && !latch_is_set(&udpif->exit_latch)) {
+                ovs_mutex_cond_wait(&revalidator->wake_cond,
+                                    &revalidator->mutex);
+            }
+            ovs_mutex_unlock(&revalidator->mutex);
+        }
 
-/* Clears all drop keys waiting to be processed by drop_key_next(). */
-void
-udpif_drop_key_clear(struct udpif *udpif)
-{
-    struct drop_key *drop_key, *next;
-    struct list list;
+        duration = time_msec() - start_time;
+        udpif->dump_duration = duration;
+        if (duration > 2000) {
+            flow_limit /= duration / 1000;
+        } else if (duration > 1300) {
+            flow_limit = flow_limit * 3 / 4;
+        } else if (duration < 1000 && n_flows > 2000
+                   && flow_limit < n_flows * 1000 / duration) {
+            flow_limit += 1000;
+        }
+        flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
+        atomic_store(&udpif->flow_limit, flow_limit);
 
-    guarded_list_pop_all(&udpif->drop_keys, &list);
-    LIST_FOR_EACH_SAFE (drop_key, next, list_node, &list) {
-        list_remove(&drop_key->list_node);
-        drop_key_destroy(drop_key);
-    }
-}
-\f
-/* The dispatcher thread is responsible for receiving upcalls from the kernel,
- * assigning them to a upcall_handler thread. */
-static void *
-udpif_dispatcher(void *arg)
-{
-    struct udpif *udpif = arg;
+        if (duration > 2000) {
+            VLOG_WARN("Spent an unreasonably long %lldms dumping flows",
+                      duration);
+        }
 
-    set_subprogram_name("dispatcher");
-    while (!latch_is_set(&udpif->exit_latch)) {
-        recv_upcalls(udpif);
-        dpif_recv_wait(udpif->dpif);
+        poll_timer_wait_until(start_time + MIN(max_idle, 500));
+        seq_wait(udpif->reval_seq, udpif->last_reval_seq);
         latch_wait(&udpif->exit_latch);
         poll_block();
     }
@@ -447,18 +650,57 @@ udpif_upcall_handler(void *arg)
         }
         ovs_mutex_unlock(&handler->mutex);
 
-        handle_upcalls(handler->udpif, &misses);
+        handle_upcalls(handler, &misses);
 
         coverage_clear();
     }
 }
-\f
-static void
-miss_destroy(struct flow_miss *miss)
+
+static void *
+udpif_revalidator(void *arg)
 {
-    xlate_out_uninit(&miss->xout);
-}
+    struct revalidator *revalidator = arg;
 
+    revalidator->name = xasprintf("revalidator_%u", ovsthread_id_self());
+    set_subprogram_name("%s", revalidator->name);
+    for (;;) {
+        struct list udumps = LIST_INITIALIZER(&udumps);
+        struct udpif *udpif = revalidator->udpif;
+        size_t i;
+
+        ovs_mutex_lock(&revalidator->mutex);
+        if (latch_is_set(&udpif->exit_latch)) {
+            ovs_mutex_unlock(&revalidator->mutex);
+            return NULL;
+        }
+
+        if (!revalidator->n_udumps) {
+            if (revalidator->dump_seq != seq_read(udpif->dump_seq)) {
+                revalidator->dump_seq = seq_read(udpif->dump_seq);
+                revalidator_sweep(revalidator);
+            } else {
+                ovs_mutex_cond_wait(&revalidator->wake_cond,
+                                    &revalidator->mutex);
+            }
+        }
+
+        for (i = 0; i < REVALIDATE_MAX_BATCH && revalidator->n_udumps; i++) {
+            list_push_back(&udumps, list_pop_front(&revalidator->udumps));
+            revalidator->n_udumps--;
+        }
+
+        /* Wake up the flow dumper. */
+        xpthread_cond_signal(&revalidator->wake_cond);
+        ovs_mutex_unlock(&revalidator->mutex);
+
+        if (!list_is_empty(&udumps)) {
+            revalidate_udumps(revalidator, &udumps);
+        }
+    }
+
+    return NULL;
+}
+\f
 static enum upcall_type
 classify_upcall(const struct upcall *upcall)
 {
@@ -601,6 +843,27 @@ recv_upcalls(struct udpif *udpif)
     }
 }
 
+/* Calculates slow path actions for 'xout'.  'buf' must statically be
+ * initialized with at least 128 bytes of space. */
+static void
+compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
+                  odp_port_t odp_in_port, struct ofpbuf *buf)
+{
+    union user_action_cookie cookie;
+    odp_port_t port;
+    uint32_t pid;
+
+    cookie.type = USER_ACTION_COOKIE_SLOW_PATH;
+    cookie.slow_path.unused = 0;
+    cookie.slow_path.reason = xout->slow;
+
+    port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
+        ? ODPP_NONE
+        : odp_in_port;
+    pid = dpif_port_get_pid(udpif->dpif, port);
+    odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);
+}
+
 static struct flow_miss *
 flow_miss_find(struct hmap *todo, const struct ofproto_dpif *ofproto,
                const struct flow *flow, uint32_t hash)
@@ -617,19 +880,26 @@ flow_miss_find(struct hmap *todo, const struct ofproto_dpif *ofproto,
 }
 
 static void
-handle_upcalls(struct udpif *udpif, struct list *upcalls)
+handle_upcalls(struct handler *handler, struct list *upcalls)
 {
-    struct dpif_op *opsp[FLOW_MISS_MAX_BATCH];
-    struct dpif_op ops[FLOW_MISS_MAX_BATCH];
+    struct hmap misses = HMAP_INITIALIZER(&misses);
+    struct udpif *udpif = handler->udpif;
+
+    struct flow_miss miss_buf[FLOW_MISS_MAX_BATCH];
+    struct dpif_op *opsp[FLOW_MISS_MAX_BATCH * 2];
+    struct dpif_op ops[FLOW_MISS_MAX_BATCH * 2];
+    struct flow_miss *miss, *next_miss;
     struct upcall *upcall, *next;
-    struct flow_miss_batch *fmb;
     size_t n_misses, n_ops, i;
-    struct flow_miss *miss;
+    unsigned int flow_limit;
+    bool fail_open, may_put;
     enum upcall_type type;
-    bool fail_open;
 
-    /* Extract the flow from each upcall.  Construct in fmb->misses a hash
-     * table that maps each unique flow to a 'struct flow_miss'.
+    atomic_read(&udpif->flow_limit, &flow_limit);
+    may_put = udpif_get_n_flows(udpif) < flow_limit;
+
+    /* Extract the flow from each upcall.  Construct in 'misses' a hash table
+     * that maps each unique flow to a 'struct flow_miss'.
      *
      * Most commonly there is a single packet per flow_miss, but there are
      * several reasons why there might be more than one, e.g.:
@@ -647,15 +917,11 @@ handle_upcalls(struct udpif *udpif, struct list *upcalls)
      *     other end of the connection, which gives OVS a chance to set up a
      *     datapath flow.)
      */
-    fmb = xmalloc(sizeof *fmb);
-    fmb->reval_seq = seq_read(udpif->reval_seq);
-    hmap_init(&fmb->misses);
-    list_init(&fmb->upcalls);
     n_misses = 0;
     LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
         struct dpif_upcall *dupcall = &upcall->dpif_upcall;
+        struct flow_miss *miss = &miss_buf[n_misses];
         struct ofpbuf *packet = &dupcall->packet;
-        struct flow_miss *miss = &fmb->miss_buf[n_misses];
         struct flow_miss *existing_miss;
         struct ofproto_dpif *ofproto;
         struct dpif_sflow *sflow;
@@ -669,8 +935,6 @@ handle_upcalls(struct udpif *udpif, struct list *upcalls)
                               &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
         if (error) {
             if (error == ENODEV) {
-                struct drop_key *drop_key;
-
                 /* Received packet on datapath port for which we couldn't
                  * associate an ofproto.  This can happen if a port is removed
                  * while traffic is being received.  Print a rate-limited
@@ -679,19 +943,9 @@ handle_upcalls(struct udpif *udpif, struct list *upcalls)
                  * in the kernel. */
                 VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
                              "port %"PRIu32, odp_in_port);
-
-                drop_key = xmalloc(sizeof *drop_key);
-                drop_key->key = xmemdup(dupcall->key, dupcall->key_len);
-                drop_key->key_len = dupcall->key_len;
-
-                if (guarded_list_push_back(&udpif->drop_keys,
-                                           &drop_key->list_node,
-                                           MAX_QUEUE_LENGTH)) {
-                    seq_change(udpif->wait_seq);
-                } else {
-                    COVERAGE_INC(drop_queue_overflow);
-                    drop_key_destroy(drop_key);
-                }
+                dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
+                              dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
+                              NULL);
             }
             list_remove(&upcall->list_node);
             upcall_destroy(upcall);
@@ -706,10 +960,10 @@ handle_upcalls(struct udpif *udpif, struct list *upcalls)
                          &flow.tunnel, &flow.in_port, &miss->flow);
 
             hash = flow_hash(&miss->flow, 0);
-            existing_miss = flow_miss_find(&fmb->misses, ofproto, &miss->flow,
+            existing_miss = flow_miss_find(&misses, ofproto, &miss->flow,
                                            hash);
             if (!existing_miss) {
-                hmap_insert(&fmb->misses, &miss->hmap_node, hash);
+                hmap_insert(&misses, &miss->hmap_node, hash);
                 miss->ofproto = ofproto;
                 miss->key = dupcall->key;
                 miss->key_len = dupcall->key_len;
@@ -718,6 +972,7 @@ handle_upcalls(struct udpif *udpif, struct list *upcalls)
                 miss->stats.n_bytes = 0;
                 miss->stats.used = time_msec();
                 miss->stats.tcp_flags = 0;
+                miss->odp_in_port = odp_in_port;
 
                 n_misses++;
             } else {
@@ -786,13 +1041,21 @@ handle_upcalls(struct udpif *udpif, struct list *upcalls)
      * We can't do this in the previous loop because we need the TCP flags for
      * all the packets in each miss. */
     fail_open = false;
-    HMAP_FOR_EACH (miss, hmap_node, &fmb->misses) {
+    HMAP_FOR_EACH (miss, hmap_node, &misses) {
         struct xlate_in xin;
 
         xlate_in_init(&xin, miss->ofproto, &miss->flow, NULL,
                       miss->stats.tcp_flags, NULL);
         xin.may_learn = true;
-        xin.resubmit_stats = &miss->stats;
+
+        if (miss->upcall_type == DPIF_UC_MISS) {
+            xin.resubmit_stats = &miss->stats;
+        } else {
+            /* For non-miss upcalls, there's a flow in the datapath which this
+             * packet was accounted to.  Presumably the revalidators will deal
+             * with pushing its stats eventually. */
+        }
+
         xlate_actions(&xin, &miss->xout);
         fail_open = fail_open || miss->xout.fail_open;
     }
@@ -813,6 +1076,9 @@ handle_upcalls(struct udpif *udpif, struct list *upcalls)
     LIST_FOR_EACH (upcall, list_node, upcalls) {
         struct flow_miss *miss = upcall->flow_miss;
         struct ofpbuf *packet = &upcall->dpif_upcall.packet;
+        struct ofpbuf mask;
+        struct dpif_op *op;
+        bool megaflow;
 
         if (miss->xout.slow) {
             struct xlate_in xin;
@@ -821,9 +1087,38 @@ handle_upcalls(struct udpif *udpif, struct list *upcalls)
             xlate_actions_for_side_effects(&xin);
         }
 
-        if (miss->xout.odp_actions.size) {
-            struct dpif_op *op;
+        atomic_read(&enable_megaflows, &megaflow);
+        ofpbuf_use_stack(&mask, &miss->mask_buf, sizeof miss->mask_buf);
+        if (megaflow) {
+            odp_flow_key_from_mask(&mask, &miss->xout.wc.masks, &miss->flow,
+                                   UINT32_MAX);
+        }
 
+        if (may_put) {
+            op = &ops[n_ops++];
+            op->type = DPIF_OP_FLOW_PUT;
+            op->u.flow_put.flags = DPIF_FP_CREATE | DPIF_FP_MODIFY;
+            op->u.flow_put.key = miss->key;
+            op->u.flow_put.key_len = miss->key_len;
+            op->u.flow_put.mask = mask.data;
+            op->u.flow_put.mask_len = mask.size;
+            op->u.flow_put.stats = NULL;
+
+            if (!miss->xout.slow) {
+                op->u.flow_put.actions = miss->xout.odp_actions.data;
+                op->u.flow_put.actions_len = miss->xout.odp_actions.size;
+            } else {
+                struct ofpbuf buf;
+
+                ofpbuf_use_stack(&buf, miss->slow_path_buf,
+                                 sizeof miss->slow_path_buf);
+                compose_slow_path(udpif, &miss->xout, miss->odp_in_port, &buf);
+                op->u.flow_put.actions = buf.data;
+                op->u.flow_put.actions_len = buf.size;
+            }
+        }
+
+        if (miss->xout.odp_actions.size) {
             if (miss->flow.in_port.ofp_port
                 != vsp_realdev_to_vlandev(miss->ofproto,
                                           miss->flow.in_port.ofp_port,
@@ -882,17 +1177,287 @@ handle_upcalls(struct udpif *udpif, struct list *upcalls)
     }
     dpif_operate(udpif->dpif, opsp, n_ops);
 
-    list_move(&fmb->upcalls, upcalls);
+    HMAP_FOR_EACH_SAFE (miss, next_miss, hmap_node, &misses) {
+        hmap_remove(&misses, &miss->hmap_node);
+        xlate_out_uninit(&miss->xout);
+    }
+    hmap_destroy(&misses);
+
+    LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
+        list_remove(&upcall->list_node);
+        upcall_destroy(upcall);
+    }
+}
+
+static struct udpif_key *
+ukey_lookup(struct revalidator *revalidator, struct udpif_flow_dump *udump)
+{
+    struct udpif_key *ukey;
+
+    HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, udump->key_hash,
+                             &revalidator->ukeys) {
+        if (ukey->key_len == udump->key_len
+            && !memcmp(ukey->key, udump->key, udump->key_len)) {
+            return ukey;
+        }
+    }
+    return NULL;
+}
+
+static void
+ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey)
+{
+    hmap_remove(&revalidator->ukeys, &ukey->hmap_node);
+    free(ukey);
+}
+
+static bool
+revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
+                struct udpif_key *ukey)
+{
+    struct ofpbuf xout_actions, *actions;
+    uint64_t slow_path_buf[128 / 8];
+    struct xlate_out xout, *xoutp;
+    struct flow flow, udump_mask;
+    struct ofproto_dpif *ofproto;
+    struct dpif_flow_stats push;
+    uint32_t *udump32, *xout32;
+    odp_port_t odp_in_port;
+    struct xlate_in xin;
+    int error;
+    size_t i;
+    bool ok;
+
+    ok = false;
+    xoutp = NULL;
+    actions = NULL;
+
+    /* If we don't need to revalidate, we can simply push the stats contained
+     * in the udump, otherwise we'll have to get the actions so we can check
+     * them. */
+    if (udump->need_revalidate) {
+        if (dpif_flow_get(udpif->dpif, ukey->key, ukey->key_len, &actions,
+                          &udump->stats)) {
+            goto exit;
+        }
+    }
+
+    push.used = udump->stats.used;
+    push.tcp_flags = udump->stats.tcp_flags;
+    push.n_packets = udump->stats.n_packets > ukey->stats.n_packets
+        ? udump->stats.n_packets - ukey->stats.n_packets
+        : 0;
+    push.n_bytes = udump->stats.n_bytes > ukey->stats.n_bytes
+        ? udump->stats.n_bytes - ukey->stats.n_bytes
+        : 0;
+    ukey->stats = udump->stats;
+
+    if (!push.n_packets && !udump->need_revalidate) {
+        ok = true;
+        goto exit;
+    }
+
+    error = xlate_receive(udpif->backer, NULL, ukey->key, ukey->key_len, &flow,
+                          NULL, &ofproto, NULL, NULL, NULL, &odp_in_port);
+    if (error) {
+        goto exit;
+    }
+
+    xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags, NULL);
+    xin.resubmit_stats = push.n_packets ? &push : NULL;
+    xin.may_learn = push.n_packets > 0;
+    xin.skip_wildcards = !udump->need_revalidate;
+    xlate_actions(&xin, &xout);
+    xoutp = &xout;
 
-    if (fmb->reval_seq != seq_read(udpif->reval_seq)) {
-        COVERAGE_INC(fmb_queue_revalidated);
-        flow_miss_batch_destroy(fmb);
-    } else if (!guarded_list_push_back(&udpif->fmbs, &fmb->list_node,
-                                       MAX_QUEUE_LENGTH)) {
-        COVERAGE_INC(fmb_queue_overflow);
-        flow_miss_batch_destroy(fmb);
+    if (!udump->need_revalidate) {
+        ok = true;
+        goto exit;
+    }
+
+    if (!xout.slow) {
+        ofpbuf_use_const(&xout_actions, xout.odp_actions.data,
+                         xout.odp_actions.size);
     } else {
-        seq_change(udpif->wait_seq);
+        ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);
+        compose_slow_path(udpif, &xout, odp_in_port, &xout_actions);
+    }
+
+    if (!ofpbuf_equal(&xout_actions, actions)) {
+        goto exit;
+    }
+
+    if (odp_flow_key_to_mask(udump->mask, udump->mask_len, &udump_mask, &flow)
+        == ODP_FIT_ERROR) {
+        goto exit;
+    }
+
+    /* Since the kernel is free to ignore wildcarded bits in the mask, we can't
+     * directly check that the masks are the same.  Instead we check that the
+     * mask in the kernel is more specific i.e. less wildcarded, than what
+     * we've calculated here.  This guarantees we don't catch any packets we
+     * shouldn't with the megaflow. */
+    udump32 = (uint32_t *) &udump_mask;
+    xout32 = (uint32_t *) &xout.wc.masks;
+    for (i = 0; i < FLOW_U32S; i++) {
+        if ((udump32[i] | xout32[i]) != udump32[i]) {
+            goto exit;
+        }
+    }
+    ok = true;
+
+exit:
+    ofpbuf_delete(actions);
+    xlate_out_uninit(xoutp);
+    return ok;
+}
+
+static void
+revalidate_udumps(struct revalidator *revalidator, struct list *udumps)
+{
+    struct udpif *udpif = revalidator->udpif;
+
+    struct {
+        struct dpif_flow_stats ukey_stats;    /* Stats stored in the ukey. */
+        struct dpif_flow_stats stats;         /* Stats for 'op'. */
+        struct dpif_op op;                    /* Flow del operation. */
+    } ops[REVALIDATE_MAX_BATCH];
+
+    struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
+    struct udpif_flow_dump *udump, *next_udump;
+    size_t n_ops, i, n_flows;
+    unsigned int flow_limit;
+    long long int max_idle;
+    bool must_del;
+
+    atomic_read(&udpif->max_idle, &max_idle);
+    atomic_read(&udpif->flow_limit, &flow_limit);
+
+    n_flows = udpif_get_n_flows(udpif);
+
+    must_del = false;
+    if (n_flows > flow_limit) {
+        must_del = n_flows > 2 * flow_limit;
+        max_idle = 100;
+    }
+
+    n_ops = 0;
+    LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) {
+        long long int used, now;
+        struct udpif_key *ukey;
+
+        now = time_msec();
+        ukey = ukey_lookup(revalidator, udump);
+
+        used = udump->stats.used;
+        if (!used && ukey) {
+            used = ukey->created;
+        }
+
+        if (must_del || (used && used < now - max_idle)) {
+            struct dpif_flow_stats *ukey_stats = &ops[n_ops].ukey_stats;
+            struct dpif_op *op = &ops[n_ops].op;
+
+            op->type = DPIF_OP_FLOW_DEL;
+            op->u.flow_del.key = udump->key;
+            op->u.flow_del.key_len = udump->key_len;
+            op->u.flow_del.stats = &ops[n_ops].stats;
+            n_ops++;
+
+            if (ukey) {
+                *ukey_stats = ukey->stats;
+                ukey_delete(revalidator, ukey);
+            } else {
+                memset(ukey_stats, 0, sizeof *ukey_stats);
+            }
+
+            continue;
+        }
+
+        if (!ukey) {
+            ukey = xmalloc(sizeof *ukey);
+
+            ukey->key = (struct nlattr *) &ukey->key_buf;
+            memcpy(ukey->key, udump->key, udump->key_len);
+            ukey->key_len = udump->key_len;
+
+            ukey->created = used ? used : now;
+            memset(&ukey->stats, 0, sizeof ukey->stats);
+
+            ukey->mark = false;
+
+            hmap_insert(&revalidator->ukeys, &ukey->hmap_node,
+                        udump->key_hash);
+        }
+        ukey->mark = true;
+
+        if (!revalidate_ukey(udpif, udump, ukey)) {
+            dpif_flow_del(udpif->dpif, udump->key, udump->key_len, NULL);
+            ukey_delete(revalidator, ukey);
+        }
+
+        list_remove(&udump->list_node);
+        free(udump);
+    }
+
+    for (i = 0; i < n_ops; i++) {
+        opsp[i] = &ops[i].op;
+    }
+    dpif_operate(udpif->dpif, opsp, n_ops);
+
+    for (i = 0; i < n_ops; i++) {
+        struct dpif_flow_stats push, *stats, *ukey_stats;
+
+        ukey_stats  = &ops[i].ukey_stats;
+        stats = ops[i].op.u.flow_del.stats;
+        push.used = MAX(stats->used, ukey_stats->used);
+        push.tcp_flags = stats->tcp_flags | ukey_stats->tcp_flags;
+        push.n_packets = stats->n_packets - ukey_stats->n_packets;
+        push.n_bytes = stats->n_bytes - ukey_stats->n_bytes;
+
+        if (push.n_packets || netflow_exists()) {
+            struct ofproto_dpif *ofproto;
+            struct netflow *netflow;
+            struct flow flow;
+
+            if (!xlate_receive(udpif->backer, NULL, ops[i].op.u.flow_del.key,
+                               ops[i].op.u.flow_del.key_len, &flow, NULL,
+                               &ofproto, NULL, NULL, &netflow, NULL)) {
+                struct xlate_in xin;
+
+                xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags,
+                              NULL);
+                xin.resubmit_stats = push.n_packets ? &push : NULL;
+                xin.may_learn = push.n_packets > 0;
+                xin.skip_wildcards = true;
+                xlate_actions_for_side_effects(&xin);
+
+                if (netflow) {
+                    netflow_expire(netflow, &flow);
+                    netflow_flow_clear(netflow, &flow);
+                    netflow_unref(netflow);
+                }
+            }
+        }
+    }
+
+    LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) {
+        list_remove(&udump->list_node);
+        free(udump);
+    }
+}
+
+static void
+revalidator_sweep(struct revalidator *revalidator)
+{
+    struct udpif_key *ukey, *next;
+
+    HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, &revalidator->ukeys) {
+        if (ukey->mark) {
+            ukey->mark = false;
+        } else {
+            ukey_delete(revalidator, ukey);
+        }
     }
 }
 \f
@@ -904,9 +1469,21 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
     struct udpif *udpif;
 
     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+        unsigned int flow_limit;
+        long long int max_idle;
         size_t i;
 
+        atomic_read(&udpif->flow_limit, &flow_limit);
+        atomic_read(&udpif->max_idle, &max_idle);
+
         ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
+        ds_put_format(&ds, "\tflows         : (current %"PRIu64")"
+            " (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
+            udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
+        ds_put_format(&ds, "\tmax idle      : %lldms\n", max_idle);
+        ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
+
+        ds_put_char(&ds, '\n');
         for (i = 0; i < udpif->n_handlers; i++) {
             struct handler *handler = &udpif->handlers[i];
 
@@ -915,8 +1492,51 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
                           handler->name, handler->n_upcalls);
             ovs_mutex_unlock(&handler->mutex);
         }
+
+        ds_put_char(&ds, '\n');
+        for (i = 0; i < n_revalidators; i++) {
+            struct revalidator *revalidator = &udpif->revalidators[i];
+
+            /* XXX: The result of hmap_count(&revalidator->ukeys) may not be
+             * accurate because it's not protected by the revalidator mutex. */
+            ovs_mutex_lock(&revalidator->mutex);
+            ds_put_format(&ds, "\t%s: (dump queue %"PRIuSIZE") (keys %"PRIuSIZE
+                          ")\n", revalidator->name, revalidator->n_udumps,
+                          hmap_count(&revalidator->ukeys));
+            ovs_mutex_unlock(&revalidator->mutex);
+        }
     }
 
     unixctl_command_reply(conn, ds_cstr(&ds));
     ds_destroy(&ds);
 }
+
+/* Disable using the megaflows.
+ *
+ * This command is only needed for advanced debugging, so it's not
+ * documented in the man page. */
+static void
+upcall_unixctl_disable_megaflows(struct unixctl_conn *conn,
+                                 int argc OVS_UNUSED,
+                                 const char *argv[] OVS_UNUSED,
+                                 void *aux OVS_UNUSED)
+{
+    atomic_store(&enable_megaflows, false);
+    udpif_flush();
+    unixctl_command_reply(conn, "megaflows disabled");
+}
+
+/* Re-enable using megaflows.
+ *
+ * This command is only needed for advanced debugging, so it's not
+ * documented in the man page. */
+static void
+upcall_unixctl_enable_megaflows(struct unixctl_conn *conn,
+                                int argc OVS_UNUSED,
+                                const char *argv[] OVS_UNUSED,
+                                void *aux OVS_UNUSED)
+{
+    atomic_store(&enable_megaflows, true);
+    udpif_flush();
+    unixctl_command_reply(conn, "megaflows enabled");
+}