+ hmap_insert(&udpif->ukeys[idx].hmap, &ukey->hmap_node, hash);
+ ok = true;
+ }
+ ovs_mutex_unlock(&udpif->ukeys[idx].mutex);
+
+ return ok;
+}
+
+static void
+ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey)
+ OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+ if (revalidator) {
+ hmap_remove(revalidator->ukeys, &ukey->hmap_node);
+ }
+ xlate_cache_delete(ukey->xcache);
+ ovs_mutex_destroy(&ukey->mutex);
+ free(ukey);
+}
+
+static bool
+should_revalidate(uint64_t packets, long long int used)
+{
+ long long int metric, now, duration;
+
+ /* Calculate the mean time between seeing these packets. If this
+ * exceeds the threshold, then delete the flow rather than performing
+ * costly revalidation for flows that aren't being hit frequently.
+ *
+ * This is targeted at situations where the dump_duration is high (~1s),
+ * and revalidation is triggered by a call to udpif_revalidate(). In
+ * these situations, revalidation of all flows causes fluctuations in the
+ * flow_limit due to the interaction with the dump_duration and max_idle.
+ * This tends to result in deletion of low-throughput flows anyway, so
+ * skip the revalidation and just delete those flows. */
+ packets = MAX(packets, 1);
+ now = MAX(used, time_msec());
+ duration = now - used;
+ metric = duration / packets;
+
+ if (metric > 200) {
+ return false;
+ }
+ return true;
+}
+
+static bool
+revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
+ const struct nlattr *mask, size_t mask_len,
+ const struct nlattr *actions, size_t actions_len,
+ const struct dpif_flow_stats *stats)
+{
+ uint64_t slow_path_buf[128 / 8];
+ struct xlate_out xout, *xoutp;
+ struct netflow *netflow;
+ struct ofproto_dpif *ofproto;
+ struct dpif_flow_stats push;
+ struct ofpbuf xout_actions;
+ struct flow flow, dp_mask;
+ uint32_t *dp32, *xout32;
+ odp_port_t odp_in_port;
+ struct xlate_in xin;
+ long long int last_used;
+ int error;
+ size_t i;
+ bool may_learn, ok;
+
+ ok = false;
+ xoutp = NULL;
+ netflow = NULL;
+
+ ovs_mutex_lock(&ukey->mutex);
+ last_used = ukey->stats.used;
+ push.used = stats->used;
+ push.tcp_flags = stats->tcp_flags;
+ push.n_packets = stats->n_packets > ukey->stats.n_packets
+ ? stats->n_packets - ukey->stats.n_packets
+ : 0;
+ push.n_bytes = stats->n_bytes > ukey->stats.n_bytes
+ ? stats->n_bytes - ukey->stats.n_bytes
+ : 0;
+
+ if (!ukey->flow_exists) {
+ /* Don't bother revalidating if the flow was already deleted. */
+ goto exit;
+ }
+
+ if (udpif->need_revalidate && last_used
+ && !should_revalidate(push.n_packets, last_used)) {
+ ok = false;
+ goto exit;
+ }
+
+ /* We will push the stats, so update the ukey stats cache. */
+ ukey->stats = *stats;
+ if (!push.n_packets && !udpif->need_revalidate) {
+ ok = true;
+ goto exit;
+ }
+
+ may_learn = push.n_packets > 0;
+ if (ukey->xcache && !udpif->need_revalidate) {
+ xlate_push_stats(ukey->xcache, may_learn, &push);
+ ok = true;
+ goto exit;
+ }
+
+ error = xlate_receive(udpif->backer, NULL, ukey->key, ukey->key_len, &flow,
+ &ofproto, NULL, NULL, &netflow, &odp_in_port);
+ if (error) {
+ goto exit;
+ }
+
+ if (udpif->need_revalidate) {
+ xlate_cache_clear(ukey->xcache);
+ }
+ if (!ukey->xcache) {
+ ukey->xcache = xlate_cache_new();
+ }
+
+ xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags, NULL);
+ xin.resubmit_stats = push.n_packets ? &push : NULL;
+ xin.xcache = ukey->xcache;
+ xin.may_learn = may_learn;
+ xin.skip_wildcards = !udpif->need_revalidate;
+ xlate_actions(&xin, &xout);
+ xoutp = &xout;
+
+ if (!udpif->need_revalidate) {
+ ok = true;
+ goto exit;
+ }
+
+ if (!xout.slow) {
+ ofpbuf_use_const(&xout_actions, ofpbuf_data(&xout.odp_actions),
+ ofpbuf_size(&xout.odp_actions));
+ } else {
+ ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);
+ compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions);
+ }
+
+ if (actions_len != ofpbuf_size(&xout_actions)
+ || memcmp(ofpbuf_data(&xout_actions), actions, actions_len)) {
+ goto exit;
+ }
+
+ if (odp_flow_key_to_mask(mask, mask_len, &dp_mask, &flow)
+ == ODP_FIT_ERROR) {
+ goto exit;
+ }
+
+ /* Since the kernel is free to ignore wildcarded bits in the mask, we can't
+ * directly check that the masks are the same. Instead we check that the
+ * mask in the kernel is more specific i.e. less wildcarded, than what
+ * we've calculated here. This guarantees we don't catch any packets we
+ * shouldn't with the megaflow. */
+ dp32 = (uint32_t *) &dp_mask;
+ xout32 = (uint32_t *) &xout.wc.masks;
+ for (i = 0; i < FLOW_U32S; i++) {
+ if ((dp32[i] | xout32[i]) != dp32[i]) {
+ goto exit;
+ }
+ }
+ ok = true;
+
+exit:
+ ovs_mutex_unlock(&ukey->mutex);
+ if (netflow) {
+ if (!ok) {
+ netflow_expire(netflow, &flow);
+ netflow_flow_clear(netflow, &flow);
+ }
+ netflow_unref(netflow);
+ }
+ xlate_out_uninit(xoutp);
+ return ok;
+}
+
+struct dump_op {
+ struct udpif_key *ukey;
+ struct dpif_flow_stats stats; /* Stats for 'op'. */
+ struct dpif_op op; /* Flow del operation. */
+};
+
+static void
+dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len,
+ struct udpif_key *ukey)
+{
+ op->ukey = ukey;
+ op->op.type = DPIF_OP_FLOW_DEL;
+ op->op.u.flow_del.key = key;
+ op->op.u.flow_del.key_len = key_len;
+ op->op.u.flow_del.stats = &op->stats;
+}
+
+static void
+push_dump_ops__(struct udpif *udpif, struct dump_op *ops, size_t n_ops)
+{
+ struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
+ size_t i;
+
+ ovs_assert(n_ops <= REVALIDATE_MAX_BATCH);
+ for (i = 0; i < n_ops; i++) {
+ opsp[i] = &ops[i].op;
+ }
+ dpif_operate(udpif->dpif, opsp, n_ops);
+
+ for (i = 0; i < n_ops; i++) {
+ struct dump_op *op = &ops[i];
+ struct dpif_flow_stats *push, *stats, push_buf;
+
+ stats = op->op.u.flow_del.stats;
+ if (op->ukey) {
+ push = &push_buf;
+ ovs_mutex_lock(&op->ukey->mutex);
+ push->used = MAX(stats->used, op->ukey->stats.used);
+ push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
+ push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
+ push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
+ ovs_mutex_unlock(&op->ukey->mutex);
+ } else {
+ push = stats;
+ }
+
+ if (push->n_packets || netflow_exists()) {
+ struct ofproto_dpif *ofproto;
+ struct netflow *netflow;
+ struct flow flow;
+ bool may_learn;
+
+ may_learn = push->n_packets > 0;
+ if (op->ukey) {
+ ovs_mutex_lock(&op->ukey->mutex);
+ if (op->ukey->xcache) {
+ xlate_push_stats(op->ukey->xcache, may_learn, push);
+ ovs_mutex_unlock(&op->ukey->mutex);
+ continue;
+ }
+ ovs_mutex_unlock(&op->ukey->mutex);
+ }
+
+ if (!xlate_receive(udpif->backer, NULL, op->op.u.flow_del.key,
+ op->op.u.flow_del.key_len, &flow, &ofproto,
+ NULL, NULL, &netflow, NULL)) {
+ struct xlate_in xin;
+
+ xlate_in_init(&xin, ofproto, &flow, NULL, push->tcp_flags,
+ NULL);
+ xin.resubmit_stats = push->n_packets ? push : NULL;
+ xin.may_learn = may_learn;
+ xin.skip_wildcards = true;
+ xlate_actions_for_side_effects(&xin);
+
+ if (netflow) {
+ netflow_expire(netflow, &flow);
+ netflow_flow_clear(netflow, &flow);
+ netflow_unref(netflow);
+ }
+ }
+ }
+ }
+}
+
+static void
+push_dump_ops(struct revalidator *revalidator,
+ struct dump_op *ops, size_t n_ops)
+{
+ int i;
+
+ push_dump_ops__(revalidator->udpif, ops, n_ops);
+ for (i = 0; i < n_ops; i++) {
+ ukey_delete(revalidator, ops[i].ukey);
+ }
+}
+
+static void
+revalidate(struct revalidator *revalidator)
+{
+ struct udpif *udpif = revalidator->udpif;
+
+ struct dump_op ops[REVALIDATE_MAX_BATCH];
+ const struct nlattr *key, *mask, *actions;
+ size_t key_len, mask_len, actions_len;
+ const struct dpif_flow_stats *stats;
+ long long int now;
+ unsigned int flow_limit;
+ size_t n_ops;
+ void *state;
+
+ n_ops = 0;
+ now = time_msec();
+ atomic_read(&udpif->flow_limit, &flow_limit);
+
+ dpif_flow_dump_state_init(udpif->dpif, &state);
+ while (dpif_flow_dump_next(&udpif->dump, state, &key, &key_len, &mask,
+ &mask_len, &actions, &actions_len, &stats)) {
+ struct udpif_key *ukey;
+ bool mark, may_destroy;
+ long long int used, max_idle;
+ uint32_t hash;
+ size_t n_flows;
+
+ hash = hash_bytes(key, key_len, udpif->secret);
+ ukey = ukey_lookup(udpif, key, key_len, hash);
+
+ used = stats->used;
+ if (!used && ukey) {
+ ovs_mutex_lock(&ukey->mutex);
+
+ if (ukey->mark || !ukey->flow_exists) {
+ /* The flow has already been dumped. This can occasionally
+ * occur if the datapath is changed in the middle of a flow
+ * dump. Rather than perform the same work twice, skip the
+ * flow this time. */
+ ovs_mutex_unlock(&ukey->mutex);
+ COVERAGE_INC(upcall_duplicate_flow);
+ continue;
+ }
+
+ used = ukey->created;
+ ovs_mutex_unlock(&ukey->mutex);
+ }
+
+ n_flows = udpif_get_n_flows(udpif);
+ max_idle = ofproto_max_idle;
+ if (n_flows > flow_limit) {
+ max_idle = 100;
+ }
+
+ if ((used && used < now - max_idle) || n_flows > flow_limit * 2) {
+ mark = false;
+ } else {
+ if (!ukey) {
+ ukey = ukey_create(key, key_len, used);
+ if (!udpif_insert_ukey(udpif, ukey, hash)) {
+ /* The same ukey has already been created. This means that
+ * another revalidator is processing this flow
+ * concurrently, so don't bother processing it. */
+ ukey_delete(NULL, ukey);
+ continue;
+ }
+ }
+
+ mark = revalidate_ukey(udpif, ukey, mask, mask_len, actions,
+ actions_len, stats);
+ }
+
+ if (ukey) {
+ ovs_mutex_lock(&ukey->mutex);
+ ukey->mark = ukey->flow_exists = mark;
+ ovs_mutex_unlock(&ukey->mutex);
+ }
+
+ if (!mark) {
+ dump_op_init(&ops[n_ops++], key, key_len, ukey);
+ }
+
+ may_destroy = dpif_flow_dump_next_may_destroy_keys(&udpif->dump,
+ state);
+
+ /* Only update 'now' immediately before 'buffer' will be updated.
+ * This gives us the current time relative to the time the datapath
+ * will write into 'stats'. */
+ if (may_destroy) {
+ now = time_msec();
+ }
+
+ /* Only do a dpif_operate when we've hit our maximum batch, or when our
+ * memory is about to be clobbered by the next call to
+ * dpif_flow_dump_next(). */
+ if (n_ops == REVALIDATE_MAX_BATCH || (n_ops && may_destroy)) {
+ push_dump_ops__(udpif, ops, n_ops);
+ n_ops = 0;
+ }
+ }
+
+ if (n_ops) {
+ push_dump_ops__(udpif, ops, n_ops);
+ }
+
+ dpif_flow_dump_state_uninit(udpif->dpif, state);
+}
+
+static void
+revalidator_sweep__(struct revalidator *revalidator, bool purge)
+ OVS_NO_THREAD_SAFETY_ANALYSIS
+{
+ struct dump_op ops[REVALIDATE_MAX_BATCH];
+ struct udpif_key *ukey, *next;
+ size_t n_ops;
+
+ n_ops = 0;
+
+ /* During garbage collection, this revalidator completely owns its ukeys
+ * map, and therefore doesn't need to do any locking. */
+ HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, revalidator->ukeys) {
+ if (!purge && ukey->mark) {
+ ukey->mark = false;
+ } else if (!ukey->flow_exists) {
+ ukey_delete(revalidator, ukey);
+ } else {
+ struct dump_op *op = &ops[n_ops++];
+
+ /* If we have previously seen a flow in the datapath, but didn't
+ * see it during the most recent dump, delete it. This allows us
+ * to clean up the ukey and keep the statistics consistent. */
+ dump_op_init(op, ukey->key, ukey->key_len, ukey);
+ if (n_ops == REVALIDATE_MAX_BATCH) {
+ push_dump_ops(revalidator, ops, n_ops);
+ n_ops = 0;
+ }
+ }
+ }
+
+ if (n_ops) {
+ push_dump_ops(revalidator, ops, n_ops);
+ }
+}
+
+static void
+revalidator_sweep(struct revalidator *revalidator)
+{
+ revalidator_sweep__(revalidator, false);
+}
+
+static void
+revalidator_purge(struct revalidator *revalidator)
+{
+ revalidator_sweep__(revalidator, true);
+}
+\f
+static void
+upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
+{
+ struct ds ds = DS_EMPTY_INITIALIZER;
+ struct udpif *udpif;
+
+ LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+ unsigned int flow_limit;
+ size_t i;
+
+ atomic_read(&udpif->flow_limit, &flow_limit);
+
+ ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
+ ds_put_format(&ds, "\tflows : (current %"PRIu64")"
+ " (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
+ udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
+ ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
+
+ ds_put_char(&ds, '\n');
+ for (i = 0; i < n_revalidators; i++) {
+ struct revalidator *revalidator = &udpif->revalidators[i];
+
+ ovs_mutex_lock(&udpif->ukeys[i].mutex);
+ ds_put_format(&ds, "\t%s: (keys %"PRIuSIZE")\n", revalidator->name,
+ hmap_count(&udpif->ukeys[i].hmap));
+ ovs_mutex_unlock(&udpif->ukeys[i].mutex);
+ }
+ }
+
+ unixctl_command_reply(conn, ds_cstr(&ds));
+ ds_destroy(&ds);
+}
+
+/* Disable using the megaflows.
+ *
+ * This command is only needed for advanced debugging, so it's not
+ * documented in the man page. */
+static void
+upcall_unixctl_disable_megaflows(struct unixctl_conn *conn,
+ int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED,
+ void *aux OVS_UNUSED)
+{
+ atomic_store(&enable_megaflows, false);
+ udpif_flush_all_datapaths();
+ unixctl_command_reply(conn, "megaflows disabled");
+}
+
+/* Re-enable using megaflows.
+ *
+ * This command is only needed for advanced debugging, so it's not
+ * documented in the man page. */
+static void
+upcall_unixctl_enable_megaflows(struct unixctl_conn *conn,
+ int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED,
+ void *aux OVS_UNUSED)
+{
+ atomic_store(&enable_megaflows, true);
+ udpif_flush_all_datapaths();
+ unixctl_command_reply(conn, "megaflows enabled");
+}
+
+/* Set the flow limit.
+ *
+ * This command is only needed for advanced debugging, so it's not
+ * documented in the man page. */
+static void
+upcall_unixctl_set_flow_limit(struct unixctl_conn *conn,
+ int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED,
+ void *aux OVS_UNUSED)
+{
+ struct ds ds = DS_EMPTY_INITIALIZER;
+ struct udpif *udpif;
+ unsigned int flow_limit = atoi(argv[1]);
+
+ LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+ atomic_store(&udpif->flow_limit, flow_limit);