netflow: Make netflow_flow_update() parameter const.
[sliver-openvswitch.git] / ofproto / ofproto-dpif-upcall.c
index dd24f5c..0d5b251 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -125,8 +125,12 @@ struct udpif {
     unsigned int avg_n_flows;
 
     /* Following fields are accessed and modified by different threads. */
-    atomic_llong max_idle;             /* Maximum datapath flow idle time. */
     atomic_uint flow_limit;            /* Datapath flow hard limit. */
+
+    /* n_flows_mutex prevents multiple threads updating these concurrently. */
+    atomic_uint64_t n_flows;           /* Number of flows in the datapath. */
+    atomic_llong n_flows_timestamp;    /* Last time n_flows was updated. */
+    struct ovs_mutex n_flows_mutex;
 };
 
 enum upcall_type {
@@ -197,7 +201,6 @@ struct flow_miss {
     struct ofproto_dpif *ofproto;
 
     struct flow flow;
-    enum odp_key_fitness key_fitness;
     const struct nlattr *key;
     size_t key_len;
     enum dpif_upcall_type upcall_type;
@@ -208,6 +211,8 @@ struct flow_miss {
     struct odputil_keybuf mask_buf;
 
     struct xlate_out xout;
+
+    bool put;
 };
 
 static void upcall_destroy(struct upcall *);
@@ -221,15 +226,18 @@ static void *udpif_flow_dumper(void *);
 static void *udpif_dispatcher(void *);
 static void *udpif_upcall_handler(void *);
 static void *udpif_revalidator(void *);
-static uint64_t udpif_get_n_flows(const struct udpif *);
+static uint64_t udpif_get_n_flows(struct udpif *);
 static void revalidate_udumps(struct revalidator *, struct list *udumps);
 static void revalidator_sweep(struct revalidator *);
+static void revalidator_purge(struct revalidator *);
 static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
                                 const char *argv[], void *aux);
 static void upcall_unixctl_disable_megaflows(struct unixctl_conn *, int argc,
                                              const char *argv[], void *aux);
 static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
                                             const char *argv[], void *aux);
+static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
+                                            const char *argv[], void *aux);
 static void ukey_delete(struct revalidator *, struct udpif_key *);
 
 static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
@@ -247,18 +255,22 @@ udpif_create(struct dpif_backer *backer, struct dpif *dpif)
                                  upcall_unixctl_disable_megaflows, NULL);
         unixctl_command_register("upcall/enable-megaflows", "", 0, 0,
                                  upcall_unixctl_enable_megaflows, NULL);
+        unixctl_command_register("upcall/set-flow-limit", "", 1, 1,
+                                 upcall_unixctl_set_flow_limit, NULL);
         ovsthread_once_done(&once);
     }
 
     udpif->dpif = dpif;
     udpif->backer = backer;
-    atomic_init(&udpif->max_idle, 5000);
     atomic_init(&udpif->flow_limit, MIN(ofproto_flow_limit, 10000));
     udpif->secret = random_uint32();
     udpif->reval_seq = seq_create();
     udpif->dump_seq = seq_create();
     latch_init(&udpif->exit_latch);
     list_push_back(&all_udpifs, &udpif->list_node);
+    atomic_init(&udpif->n_flows, 0);
+    atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
+    ovs_mutex_init(&udpif->n_flows_mutex);
 
     return udpif;
 }
@@ -273,8 +285,10 @@ udpif_destroy(struct udpif *udpif)
     latch_destroy(&udpif->exit_latch);
     seq_destroy(udpif->reval_seq);
     seq_destroy(udpif->dump_seq);
-    atomic_destroy(&udpif->max_idle);
     atomic_destroy(&udpif->flow_limit);
+    atomic_destroy(&udpif->n_flows);
+    atomic_destroy(&udpif->n_flows_timestamp);
+    ovs_mutex_destroy(&udpif->n_flows_mutex);
     free(udpif);
 }
 
@@ -318,7 +332,6 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         for (i = 0; i < udpif->n_revalidators; i++) {
             struct revalidator *revalidator = &udpif->revalidators[i];
             struct udpif_flow_dump *udump, *next_udump;
-            struct udpif_key *ukey, *next_ukey;
 
             LIST_FOR_EACH_SAFE (udump, next_udump, list_node,
                                 &revalidator->udumps) {
@@ -326,10 +339,9 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
                 free(udump);
             }
 
-            HMAP_FOR_EACH_SAFE (ukey, next_ukey, hmap_node,
-                                &revalidator->ukeys) {
-                ukey_delete(revalidator, ukey);
-            }
+            /* Delete ukeys, and delete all flows from the datapath to prevent
+             * double-counting stats. */
+            revalidator_purge(revalidator);
             hmap_destroy(&revalidator->ukeys);
             ovs_mutex_destroy(&revalidator->mutex);
 
@@ -398,6 +410,22 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
     }
 }
 
+/* Waits for all ongoing upcall translations to complete.  This ensures that
+ * there are no transient references to any removed ofprotos (or other
+ * objects).  In particular, this should be called after an ofproto is removed
+ * (e.g. via xlate_remove_ofproto()) but before it is destroyed. */
+void
+udpif_synchronize(struct udpif *udpif)
+{
+    /* This is stronger than necessary.  It would be sufficient to ensure
+     * (somehow) that each handler and revalidator thread had passed through
+     * its main loop once. */
+    size_t n_handlers = udpif->n_handlers;
+    size_t n_revalidators = udpif->n_revalidators;
+    udpif_set_threads(udpif, 0, 0);
+    udpif_set_threads(udpif, n_handlers, n_revalidators);
+}
+
 /* Notifies 'udpif' that something changed which may render previous
  * xlate_actions() results invalid. */
 void
@@ -468,12 +496,25 @@ upcall_destroy(struct upcall *upcall)
 }
 
 static uint64_t
-udpif_get_n_flows(const struct udpif *udpif)
+udpif_get_n_flows(struct udpif *udpif)
 {
-    struct dpif_dp_stats stats;
-
-    dpif_get_dp_stats(udpif->dpif, &stats);
-    return stats.n_flows;
+    long long int time, now;
+    uint64_t flow_count;
+
+    now = time_msec();
+    atomic_read(&udpif->n_flows_timestamp, &time);
+    if (time < now - 100 && !ovs_mutex_trylock(&udpif->n_flows_mutex)) {
+        struct dpif_dp_stats stats;
+
+        atomic_store(&udpif->n_flows_timestamp, now);
+        dpif_get_dp_stats(udpif->dpif, &stats);
+        flow_count = stats.n_flows;
+        atomic_store(&udpif->n_flows, flow_count);
+        ovs_mutex_unlock(&udpif->n_flows_mutex);
+    } else {
+        atomic_read(&udpif->n_flows, &flow_count);
+    }
+    return flow_count;
 }
 
 /* The dispatcher thread is responsible for receiving upcalls from the kernel,
@@ -507,10 +548,11 @@ udpif_flow_dumper(void *arg)
         struct dpif_flow_dump dump;
         size_t key_len, mask_len;
         unsigned int flow_limit;
-        long long int max_idle;
         bool need_revalidate;
         uint64_t reval_seq;
         size_t n_flows, i;
+        int error;
+        void *state = NULL;
 
         reval_seq = seq_read(udpif->reval_seq);
         need_revalidate = udpif->last_reval_seq != reval_seq;
@@ -520,22 +562,15 @@ udpif_flow_dumper(void *arg)
         udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
         udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
 
-        atomic_read(&udpif->flow_limit, &flow_limit);
-        if (n_flows < flow_limit / 8) {
-            max_idle = 5000;
-        } else if (n_flows < flow_limit / 4) {
-            max_idle = 2000;
-        } else if (n_flows < flow_limit / 2) {
-            max_idle = 1000;
-        } else {
-            max_idle = 500;
-        }
-        atomic_store(&udpif->max_idle, max_idle);
-
         start_time = time_msec();
-        dpif_flow_dump_start(&dump, udpif->dpif);
-        while (dpif_flow_dump_next(&dump, &key, &key_len, &mask, &mask_len,
-                                   NULL, NULL, &stats)
+        error = dpif_flow_dump_start(&dump, udpif->dpif);
+        if (error) {
+            VLOG_INFO("Failed to start flow dump (%s)", ovs_strerror(error));
+            goto skip;
+        }
+        dpif_flow_dump_state_init(udpif->dpif, &state);
+        while (dpif_flow_dump_next(&dump, state, &key, &key_len,
+                                   &mask, &mask_len, NULL, NULL, &stats)
                && !latch_is_set(&udpif->exit_latch)) {
             struct udpif_flow_dump *udump = xmalloc(sizeof *udump);
             struct revalidator *revalidator;
@@ -566,6 +601,7 @@ udpif_flow_dumper(void *arg)
             xpthread_cond_signal(&revalidator->wake_cond);
             ovs_mutex_unlock(&revalidator->mutex);
         }
+        dpif_flow_dump_state_uninit(udpif->dpif, state);
         dpif_flow_dump_done(&dump);
 
         /* Let all the revalidators finish and garbage collect. */
@@ -589,8 +625,9 @@ udpif_flow_dumper(void *arg)
             ovs_mutex_unlock(&revalidator->mutex);
         }
 
-        duration = time_msec() - start_time;
+        duration = MAX(time_msec() - start_time, 1);
         udpif->dump_duration = duration;
+        atomic_read(&udpif->flow_limit, &flow_limit);
         if (duration > 2000) {
             flow_limit /= duration / 1000;
         } else if (duration > 1300) {
@@ -607,7 +644,8 @@ udpif_flow_dumper(void *arg)
                       duration);
         }
 
-        poll_timer_wait_until(start_time + MIN(max_idle, 500));
+skip:
+        poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
         seq_wait(udpif->reval_seq, udpif->last_reval_seq);
         latch_wait(&udpif->exit_latch);
         poll_block();
@@ -735,16 +773,16 @@ classify_upcall(const struct upcall *upcall)
     }
     memset(&cookie, 0, sizeof cookie);
     memcpy(&cookie, nl_attr_get(dpif_upcall->userdata), userdata_len);
-    if (userdata_len == sizeof cookie.sflow
+    if (userdata_len == MAX(8, sizeof cookie.sflow)
         && cookie.type == USER_ACTION_COOKIE_SFLOW) {
         return SFLOW_UPCALL;
-    } else if (userdata_len == sizeof cookie.slow_path
+    } else if (userdata_len == MAX(8, sizeof cookie.slow_path)
                && cookie.type == USER_ACTION_COOKIE_SLOW_PATH) {
         return MISS_UPCALL;
-    } else if (userdata_len == sizeof cookie.flow_sample
+    } else if (userdata_len == MAX(8, sizeof cookie.flow_sample)
                && cookie.type == USER_ACTION_COOKIE_FLOW_SAMPLE) {
         return FLOW_SAMPLE_UPCALL;
-    } else if (userdata_len == sizeof cookie.ipfix
+    } else if (userdata_len == MAX(8, sizeof cookie.ipfix)
                && cookie.type == USER_ACTION_COOKIE_IPFIX) {
         return IPFIX_UPCALL;
     } else {
@@ -929,7 +967,7 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
         int error;
 
         error = xlate_receive(udpif->backer, packet, dupcall->key,
-                              dupcall->key_len, &flow, &miss->key_fitness,
+                              dupcall->key_len, &flow,
                               &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
         if (error) {
             if (error == ENODEV) {
@@ -953,9 +991,10 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
         type = classify_upcall(upcall);
         if (type == MISS_UPCALL) {
             uint32_t hash;
+            struct pkt_metadata md;
 
-            flow_extract(packet, flow.skb_priority, flow.pkt_mark,
-                         &flow.tunnel, &flow.in_port, &miss->flow);
+            pkt_metadata_from_flow(&md, &flow);
+            flow_extract(packet, &md, &miss->flow);
 
             hash = flow_hash(&miss->flow, 0);
             existing_miss = flow_miss_find(&misses, ofproto, &miss->flow,
@@ -971,6 +1010,7 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
                 miss->stats.used = time_msec();
                 miss->stats.tcp_flags = 0;
                 miss->odp_in_port = odp_in_port;
+                miss->put = false;
 
                 n_misses++;
             } else {
@@ -1107,15 +1147,30 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
             miss->flow.vlan_tci = 0;
         }
 
-        if (may_put) {
+        /* Do not install a flow into the datapath if:
+         *
+         *    - The datapath already has too many flows.
+         *
+         *    - An earlier iteration of this loop already put the same flow.
+         *
+         *    - We received this packet via some flow installed in the kernel
+         *      already. */
+        if (may_put
+            && !miss->put
+            && upcall->dpif_upcall.type == DPIF_UC_MISS) {
             struct ofpbuf mask;
             bool megaflow;
 
+            miss->put = true;
+
             atomic_read(&enable_megaflows, &megaflow);
             ofpbuf_use_stack(&mask, &miss->mask_buf, sizeof miss->mask_buf);
             if (megaflow) {
+                size_t max_mpls;
+
+                max_mpls = ofproto_dpif_get_max_mpls_depth(miss->ofproto);
                 odp_flow_key_from_mask(&mask, &miss->xout.wc.masks,
-                                       &miss->flow, UINT32_MAX);
+                                       &miss->flow, UINT32_MAX, max_mpls);
             }
 
             op = &ops[n_ops++];
@@ -1221,6 +1276,22 @@ ukey_lookup(struct revalidator *revalidator, struct udpif_flow_dump *udump)
     return NULL;
 }
 
+static struct udpif_key *
+ukey_create(const struct nlattr *key, size_t key_len, long long int used)
+{
+    struct udpif_key *ukey = xmalloc(sizeof *ukey);
+
+    ukey->key = (struct nlattr *) &ukey->key_buf;
+    memcpy(&ukey->key_buf, key, key_len);
+    ukey->key_len = key_len;
+
+    ukey->mark = false;
+    ukey->created = used ? used : time_msec();
+    memset(&ukey->stats, 0, sizeof ukey->stats);
+
+    return ukey;
+}
+
 static void
 ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey)
 {
@@ -1275,7 +1346,7 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
     }
 
     error = xlate_receive(udpif->backer, NULL, ukey->key, ukey->key_len, &flow,
-                          NULL, &ofproto, NULL, NULL, NULL, &odp_in_port);
+                          &ofproto, NULL, NULL, NULL, &odp_in_port);
     if (error) {
         goto exit;
     }
@@ -1329,30 +1400,116 @@ exit:
     return ok;
 }
 
+struct dump_op {
+    struct udpif_key *ukey;
+    struct udpif_flow_dump *udump;
+    struct dpif_flow_stats stats; /* Stats for 'op'. */
+    struct dpif_op op;            /* Flow del operation. */
+};
+
 static void
-revalidate_udumps(struct revalidator *revalidator, struct list *udumps)
+dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len,
+             struct udpif_key *ukey, struct udpif_flow_dump *udump)
+{
+    op->ukey = ukey;
+    op->udump = udump;
+    op->op.type = DPIF_OP_FLOW_DEL;
+    op->op.u.flow_del.key = key;
+    op->op.u.flow_del.key_len = key_len;
+    op->op.u.flow_del.stats = &op->stats;
+}
+
+static void
+push_dump_ops(struct revalidator *revalidator,
+              struct dump_op *ops, size_t n_ops)
 {
     struct udpif *udpif = revalidator->udpif;
+    struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
+    size_t i;
 
-    struct {
-        struct dpif_flow_stats ukey_stats;    /* Stats stored in the ukey. */
-        struct dpif_flow_stats stats;         /* Stats for 'op'. */
-        struct dpif_op op;                    /* Flow del operation. */
-    } ops[REVALIDATE_MAX_BATCH];
+    ovs_assert(n_ops <= REVALIDATE_MAX_BATCH);
+    for (i = 0; i < n_ops; i++) {
+        opsp[i] = &ops[i].op;
+    }
+    dpif_operate(udpif->dpif, opsp, n_ops);
 
-    struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
+    for (i = 0; i < n_ops; i++) {
+        struct dump_op *op = &ops[i];
+        struct dpif_flow_stats *push, *stats, push_buf;
+
+        stats = op->op.u.flow_del.stats;
+        if (op->ukey) {
+            push = &push_buf;
+            push->used = MAX(stats->used, op->ukey->stats.used);
+            push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
+            push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
+            push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
+        } else {
+            push = stats;
+        }
+
+        if (push->n_packets || netflow_exists()) {
+            struct ofproto_dpif *ofproto;
+            struct netflow *netflow;
+            struct flow flow;
+
+            if (!xlate_receive(udpif->backer, NULL, op->op.u.flow_del.key,
+                               op->op.u.flow_del.key_len, &flow, &ofproto,
+                               NULL, NULL, &netflow, NULL)) {
+                struct xlate_in xin;
+
+                xlate_in_init(&xin, ofproto, &flow, NULL, push->tcp_flags,
+                              NULL);
+                xin.resubmit_stats = push->n_packets ? push : NULL;
+                xin.may_learn = push->n_packets > 0;
+                xin.skip_wildcards = true;
+                xlate_actions_for_side_effects(&xin);
+
+                if (netflow) {
+                    netflow_expire(netflow, &flow);
+                    netflow_flow_clear(netflow, &flow);
+                    netflow_unref(netflow);
+                }
+            }
+        }
+    }
+
+    for (i = 0; i < n_ops; i++) {
+        struct udpif_key *ukey;
+
+        /* If there's a udump, this ukey came directly from a datapath flow
+         * dump.  Sometimes a datapath can send duplicates in flow dumps, in
+         * which case we wouldn't want to double-free a ukey, so avoid that by
+         * looking up the ukey again.
+         *
+         * If there's no udump then we know what we're doing. */
+        ukey = (ops[i].udump
+                ? ukey_lookup(revalidator, ops[i].udump)
+                : ops[i].ukey);
+        if (ukey) {
+            ukey_delete(revalidator, ukey);
+        }
+    }
+}
+
+static void
+revalidate_udumps(struct revalidator *revalidator, struct list *udumps)
+{
+    struct udpif *udpif = revalidator->udpif;
+
+    struct dump_op ops[REVALIDATE_MAX_BATCH];
     struct udpif_flow_dump *udump, *next_udump;
-    size_t n_ops, i, n_flows;
+    size_t n_ops, n_flows;
     unsigned int flow_limit;
     long long int max_idle;
     bool must_del;
 
-    atomic_read(&udpif->max_idle, &max_idle);
     atomic_read(&udpif->flow_limit, &flow_limit);
 
     n_flows = udpif_get_n_flows(udpif);
 
     must_del = false;
+    max_idle = ofproto_max_idle;
     if (n_flows > flow_limit) {
         must_del = n_flows > 2 * flow_limit;
         max_idle = 100;
@@ -1372,37 +1529,14 @@ revalidate_udumps(struct revalidator *revalidator, struct list *udumps)
         }
 
         if (must_del || (used && used < now - max_idle)) {
-            struct dpif_flow_stats *ukey_stats = &ops[n_ops].ukey_stats;
-            struct dpif_op *op = &ops[n_ops].op;
-
-            op->type = DPIF_OP_FLOW_DEL;
-            op->u.flow_del.key = udump->key;
-            op->u.flow_del.key_len = udump->key_len;
-            op->u.flow_del.stats = &ops[n_ops].stats;
-            n_ops++;
-
-            if (ukey) {
-                *ukey_stats = ukey->stats;
-                ukey_delete(revalidator, ukey);
-            } else {
-                memset(ukey_stats, 0, sizeof *ukey_stats);
-            }
+            struct dump_op *dop = &ops[n_ops++];
 
+            dump_op_init(dop, udump->key, udump->key_len, ukey, udump);
             continue;
         }
 
         if (!ukey) {
-            ukey = xmalloc(sizeof *ukey);
-
-            ukey->key = (struct nlattr *) &ukey->key_buf;
-            memcpy(ukey->key, udump->key, udump->key_len);
-            ukey->key_len = udump->key_len;
-
-            ukey->created = used ? used : now;
-            memset(&ukey->stats, 0, sizeof ukey->stats);
-
-            ukey->mark = false;
-
+            ukey = ukey_create(udump->key, udump->key_len, used);
             hmap_insert(&revalidator->ukeys, &ukey->hmap_node,
                         udump->key_hash);
         }
@@ -1417,46 +1551,7 @@ revalidate_udumps(struct revalidator *revalidator, struct list *udumps)
         free(udump);
     }
 
-    for (i = 0; i < n_ops; i++) {
-        opsp[i] = &ops[i].op;
-    }
-    dpif_operate(udpif->dpif, opsp, n_ops);
-
-    for (i = 0; i < n_ops; i++) {
-        struct dpif_flow_stats push, *stats, *ukey_stats;
-
-        ukey_stats  = &ops[i].ukey_stats;
-        stats = ops[i].op.u.flow_del.stats;
-        push.used = MAX(stats->used, ukey_stats->used);
-        push.tcp_flags = stats->tcp_flags | ukey_stats->tcp_flags;
-        push.n_packets = stats->n_packets - ukey_stats->n_packets;
-        push.n_bytes = stats->n_bytes - ukey_stats->n_bytes;
-
-        if (push.n_packets || netflow_exists()) {
-            struct ofproto_dpif *ofproto;
-            struct netflow *netflow;
-            struct flow flow;
-
-            if (!xlate_receive(udpif->backer, NULL, ops[i].op.u.flow_del.key,
-                               ops[i].op.u.flow_del.key_len, &flow, NULL,
-                               &ofproto, NULL, NULL, &netflow, NULL)) {
-                struct xlate_in xin;
-
-                xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags,
-                              NULL);
-                xin.resubmit_stats = push.n_packets ? &push : NULL;
-                xin.may_learn = push.n_packets > 0;
-                xin.skip_wildcards = true;
-                xlate_actions_for_side_effects(&xin);
-
-                if (netflow) {
-                    netflow_expire(netflow, &flow);
-                    netflow_flow_clear(netflow, &flow);
-                    netflow_unref(netflow);
-                }
-            }
-        }
-    }
+    push_dump_ops(revalidator, ops, n_ops);
 
     LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) {
         list_remove(&udump->list_node);
@@ -1465,17 +1560,46 @@ revalidate_udumps(struct revalidator *revalidator, struct list *udumps)
 }
 
 static void
-revalidator_sweep(struct revalidator *revalidator)
+revalidator_sweep__(struct revalidator *revalidator, bool purge)
 {
+    struct dump_op ops[REVALIDATE_MAX_BATCH];
     struct udpif_key *ukey, *next;
+    size_t n_ops;
+
+    n_ops = 0;
 
     HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, &revalidator->ukeys) {
-        if (ukey->mark) {
+        if (!purge && ukey->mark) {
             ukey->mark = false;
         } else {
-            ukey_delete(revalidator, ukey);
+            struct dump_op *op = &ops[n_ops++];
+
+            /* If we have previously seen a flow in the datapath, but didn't
+             * see it during the most recent dump, delete it. This allows us
+             * to clean up the ukey and keep the statistics consistent. */
+            dump_op_init(op, ukey->key, ukey->key_len, ukey, NULL);
+            if (n_ops == REVALIDATE_MAX_BATCH) {
+                push_dump_ops(revalidator, ops, n_ops);
+                n_ops = 0;
+            }
         }
     }
+
+    if (n_ops) {
+        push_dump_ops(revalidator, ops, n_ops);
+    }
+}
+
+static void
+revalidator_sweep(struct revalidator *revalidator)
+{
+    revalidator_sweep__(revalidator, false);
+}
+
+static void
+revalidator_purge(struct revalidator *revalidator)
+{
+    revalidator_sweep__(revalidator, true);
 }
 \f
 static void
@@ -1487,17 +1611,14 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
 
     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
         unsigned int flow_limit;
-        long long int max_idle;
         size_t i;
 
         atomic_read(&udpif->flow_limit, &flow_limit);
-        atomic_read(&udpif->max_idle, &max_idle);
 
         ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
         ds_put_format(&ds, "\tflows         : (current %"PRIu64")"
             " (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
             udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
-        ds_put_format(&ds, "\tmax idle      : %lldms\n", max_idle);
         ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
 
         ds_put_char(&ds, '\n');
@@ -1557,3 +1678,25 @@ upcall_unixctl_enable_megaflows(struct unixctl_conn *conn,
     udpif_flush();
     unixctl_command_reply(conn, "megaflows enabled");
 }
+
+/* Set the flow limit.
+ *
+ * This command is only needed for advanced debugging, so it's not
+ * documented in the man page. */
+static void
+upcall_unixctl_set_flow_limit(struct unixctl_conn *conn,
+                              int argc OVS_UNUSED,
+                              const char *argv[] OVS_UNUSED,
+                              void *aux OVS_UNUSED)
+{
+    struct ds ds = DS_EMPTY_INITIALIZER;
+    struct udpif *udpif;
+    unsigned int flow_limit = atoi(argv[1]);
+
+    LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+        atomic_store(&udpif->flow_limit, flow_limit);
+    }
+    ds_put_format(&ds, "set flow_limit to %u\n", flow_limit);
+    unixctl_command_reply(conn, ds_cstr(&ds));
+    ds_destroy(&ds);
+}