-/* Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#include <stdbool.h>
#include <inttypes.h>
+#include "connmgr.h"
#include "coverage.h"
-#include "dynamic-string.h"
#include "dpif.h"
+#include "dynamic-string.h"
#include "fail-open.h"
+#include "guarded-list.h"
#include "latch.h"
-#include "seq.h"
#include "list.h"
#include "netlink.h"
#include "ofpbuf.h"
-#include "ofproto-dpif.h"
+#include "ofproto-dpif-ipfix.h"
+#include "ofproto-dpif-sflow.h"
+#include "ofproto-dpif-xlate.h"
#include "packets.h"
#include "poll-loop.h"
+#include "seq.h"
+#include "unixctl.h"
#include "vlog.h"
#define MAX_QUEUE_LENGTH 512
+#define FLOW_MISS_MAX_BATCH 50
+#define REVALIDATE_MAX_BATCH 50
+#define MAX_IDLE 1500
VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
COVERAGE_DEFINE(upcall_queue_overflow);
-COVERAGE_DEFINE(drop_queue_overflow);
-COVERAGE_DEFINE(miss_queue_overflow);
-COVERAGE_DEFINE(fmb_queue_overflow);
/* A thread that processes each upcall handed to it by the dispatcher thread,
- * forwards the upcall's packet, and then queues it to the main ofproto_dpif
- * to possibly set up a kernel flow as a cache. */
+ * forwards the upcall's packet, and possibly sets up a kernel flow as a
+ * cache. */
struct handler {
struct udpif *udpif; /* Parent udpif. */
pthread_t thread; /* Thread ID. */
+ char *name; /* Thread name. */
struct ovs_mutex mutex; /* Mutex guarding the following. */
- /* Atomic queue of unprocessed miss upcalls. */
+ /* Atomic queue of unprocessed upcalls. */
struct list upcalls OVS_GUARDED;
size_t n_upcalls OVS_GUARDED;
+ bool need_signal; /* Only changed by the dispatcher. */
+
pthread_cond_t wake_cond; /* Wakes 'thread' while holding
'mutex'. */
};
+/* A thread that processes each kernel flow handed to it by the flow_dumper
+ * thread, updates OpenFlow statistics, and updates or removes the kernel flow
+ * as necessary. */
+struct revalidator {
+ struct udpif *udpif; /* Parent udpif. */
+ char *name; /* Thread name. */
+
+ pthread_t thread; /* Thread ID. */
+ struct hmap ukeys; /* Datapath flow keys. */
+
+ uint64_t dump_seq;
+
+ struct ovs_mutex mutex; /* Mutex guarding the following. */
+ pthread_cond_t wake_cond;
+ struct list udumps OVS_GUARDED; /* Unprocessed udumps. */
+ size_t n_udumps OVS_GUARDED; /* Number of unprocessed udumps. */
+};
+
/* An upcall handler for ofproto_dpif.
*
- * udpif is implemented as a "dispatcher" thread that reads upcalls from the
- * kernel. It processes each upcall just enough to figure out its next
- * destination. For a "miss" upcall (MISS_UPCALL), this is one of several
- * "handler" threads (see struct handler). Other upcalls are queued to the
- * main ofproto_dpif. */
+ * udpif has two logically separate pieces:
+ *
+ * - A "dispatcher" thread that reads upcalls from the kernel and dispatches
+ * them to one of several "handler" threads (see struct handler).
+ *
+ * - A "flow_dumper" thread that reads the kernel flow table and dispatches
+ * flows to one of several "revalidator" threads (see struct
+ * revalidator). */
struct udpif {
+ struct list list_node; /* In all_udpifs list. */
+
struct dpif *dpif; /* Datapath handle. */
struct dpif_backer *backer; /* Opaque dpif_backer pointer. */
uint32_t secret; /* Random seed for upcall hash. */
pthread_t dispatcher; /* Dispatcher thread ID. */
+ pthread_t flow_dumper; /* Flow dumper thread ID. */
- struct handler *handlers; /* Miss handlers. */
+ struct handler *handlers; /* Upcall handlers. */
size_t n_handlers;
- /* Atomic queue of unprocessed drop keys. */
- struct ovs_mutex drop_key_mutex;
- struct list drop_keys OVS_GUARDED;
- size_t n_drop_keys OVS_GUARDED;
+ struct revalidator *revalidators; /* Flow revalidators. */
+ size_t n_revalidators;
- /* Atomic queue of special upcalls for ofproto-dpif to process. */
- struct ovs_mutex upcall_mutex;
- struct list upcalls OVS_GUARDED;
- size_t n_upcalls OVS_GUARDED;
+ uint64_t last_reval_seq; /* 'reval_seq' at last revalidation. */
+ struct seq *reval_seq; /* Incremented to force revalidation. */
+
+ struct seq *dump_seq; /* Increments each dump iteration. */
+
+ struct latch exit_latch; /* Tells child threads to exit. */
- /* Atomic queue of flow_miss_batches. */
- struct ovs_mutex fmb_mutex;
- struct list fmbs OVS_GUARDED;
- size_t n_fmbs OVS_GUARDED;
+ long long int dump_duration; /* Duration of the last flow dump. */
- /* Number of times udpif_revalidate() has been called. */
- atomic_uint reval_seq;
+ /* Datapath flow statistics. */
+ unsigned int max_n_flows;
+ unsigned int avg_n_flows;
- struct seq *wait_seq;
- uint64_t last_seq;
+ /* Following fields are accessed and modified by different threads. */
+ atomic_uint flow_limit; /* Datapath flow hard limit. */
- struct latch exit_latch; /* Tells child threads to exit. */
+ /* n_flows_mutex prevents multiple threads updating these concurrently. */
+ atomic_uint64_t n_flows; /* Number of flows in the datapath. */
+ atomic_llong n_flows_timestamp; /* Last time n_flows was updated. */
+ struct ovs_mutex n_flows_mutex;
};
+enum upcall_type {
+ BAD_UPCALL, /* Some kind of bug somewhere. */
+ MISS_UPCALL, /* A flow miss. */
+ SFLOW_UPCALL, /* sFlow sample. */
+ FLOW_SAMPLE_UPCALL, /* Per-flow sampling. */
+ IPFIX_UPCALL /* Per-bridge sampling. */
+};
+
+struct upcall {
+ struct list list_node; /* For queuing upcalls. */
+ struct flow_miss *flow_miss; /* This upcall's flow_miss. */
+
+ /* Raw upcall plus data for keeping track of the memory backing it. */
+ struct dpif_upcall dpif_upcall; /* As returned by dpif_recv() */
+ struct ofpbuf upcall_buf; /* Owns some data in 'dpif_upcall'. */
+ uint64_t upcall_stub[512 / 8]; /* Buffer to reduce need for malloc(). */
+};
+
+/* 'udpif_key's are responsible for tracking the little bit of state udpif
+ * needs to do flow expiration which can't be pulled directly from the
+ * datapath. They are owned, created by, maintained, and destroyed by a single
+ * revalidator making them easy to efficiently handle with multiple threads. */
+struct udpif_key {
+ struct hmap_node hmap_node; /* In parent revalidator 'ukeys' map. */
+
+ struct nlattr *key; /* Datapath flow key. */
+ size_t key_len; /* Length of 'key'. */
+
+ struct dpif_flow_stats stats; /* Stats at most recent flow dump. */
+ long long int created; /* Estimation of creation time. */
+
+ bool mark; /* Used by mark and sweep GC algorithm. */
+
+ struct odputil_keybuf key_buf; /* Memory for 'key'. */
+};
+
+/* 'udpif_flow_dump's hold the state associated with one iteration in a flow
+ * dump operation. This is created by the flow_dumper thread and handed to the
+ * appropriate revalidator thread to be processed. */
+struct udpif_flow_dump {
+ struct list list_node;
+
+ struct nlattr *key; /* Datapath flow key. */
+ size_t key_len; /* Length of 'key'. */
+ uint32_t key_hash; /* Hash of 'key'. */
+
+ struct odputil_keybuf mask_buf;
+ struct nlattr *mask; /* Datapath mask for 'key'. */
+ size_t mask_len; /* Length of 'mask'. */
+
+ struct dpif_flow_stats stats; /* Stats pulled from the datapath. */
+
+ bool need_revalidate; /* Key needs revalidation? */
+
+ struct odputil_keybuf key_buf;
+};
+
+/* Flow miss batching.
+ *
+ * Some dpifs implement operations faster when you hand them off in a batch.
+ * To allow batching, "struct flow_miss" queues the dpif-related work needed
+ * for a given flow. Each "struct flow_miss" corresponds to sending one or
+ * more packets, plus possibly installing the flow in the dpif. */
+struct flow_miss {
+ struct hmap_node hmap_node;
+ struct ofproto_dpif *ofproto;
+
+ struct flow flow;
+ const struct nlattr *key;
+ size_t key_len;
+ enum dpif_upcall_type upcall_type;
+ struct dpif_flow_stats stats;
+ odp_port_t odp_in_port;
+
+ uint64_t slow_path_buf[128 / 8];
+ struct odputil_keybuf mask_buf;
+
+ struct xlate_out xout;
+
+ bool put;
+};
+
+static void upcall_destroy(struct upcall *);
+
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
static void recv_upcalls(struct udpif *);
-static void handle_miss_upcalls(struct udpif *, struct list *upcalls);
-static void miss_destroy(struct flow_miss *);
+static void handle_upcalls(struct handler *handler, struct list *upcalls);
+static void *udpif_flow_dumper(void *);
static void *udpif_dispatcher(void *);
-static void *udpif_miss_handler(void *);
+static void *udpif_upcall_handler(void *);
+static void *udpif_revalidator(void *);
+static uint64_t udpif_get_n_flows(struct udpif *);
+static void revalidate_udumps(struct revalidator *, struct list *udumps);
+static void revalidator_sweep(struct revalidator *);
+static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
+ const char *argv[], void *aux);
+static void upcall_unixctl_disable_megaflows(struct unixctl_conn *, int argc,
+ const char *argv[], void *aux);
+static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
+ const char *argv[], void *aux);
+static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
+ const char *argv[], void *aux);
+static void ukey_delete(struct revalidator *, struct udpif_key *);
+
+static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
struct udpif *
udpif_create(struct dpif_backer *backer, struct dpif *dpif)
{
+ static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
struct udpif *udpif = xzalloc(sizeof *udpif);
+ if (ovsthread_once_start(&once)) {
+ unixctl_command_register("upcall/show", "", 0, 0, upcall_unixctl_show,
+ NULL);
+ unixctl_command_register("upcall/disable-megaflows", "", 0, 0,
+ upcall_unixctl_disable_megaflows, NULL);
+ unixctl_command_register("upcall/enable-megaflows", "", 0, 0,
+ upcall_unixctl_enable_megaflows, NULL);
+ unixctl_command_register("upcall/set-flow-limit", "", 1, 1,
+ upcall_unixctl_set_flow_limit, NULL);
+ ovsthread_once_done(&once);
+ }
+
udpif->dpif = dpif;
udpif->backer = backer;
+ atomic_init(&udpif->flow_limit, MIN(ofproto_flow_limit, 10000));
udpif->secret = random_uint32();
- udpif->wait_seq = seq_create();
+ udpif->reval_seq = seq_create();
+ udpif->dump_seq = seq_create();
latch_init(&udpif->exit_latch);
- list_init(&udpif->drop_keys);
- list_init(&udpif->upcalls);
- list_init(&udpif->fmbs);
- atomic_init(&udpif->reval_seq, 0);
- ovs_mutex_init(&udpif->drop_key_mutex, PTHREAD_MUTEX_NORMAL);
- ovs_mutex_init(&udpif->upcall_mutex, PTHREAD_MUTEX_NORMAL);
- ovs_mutex_init(&udpif->fmb_mutex, PTHREAD_MUTEX_NORMAL);
+ list_push_back(&all_udpifs, &udpif->list_node);
+ atomic_init(&udpif->n_flows, 0);
+ atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
+ ovs_mutex_init(&udpif->n_flows_mutex);
return udpif;
}
void
udpif_destroy(struct udpif *udpif)
{
- struct flow_miss_batch *fmb;
- struct drop_key *drop_key;
- struct upcall *upcall;
-
- udpif_recv_set(udpif, 0, false);
-
- while ((drop_key = drop_key_next(udpif))) {
- drop_key_destroy(drop_key);
- }
-
- while ((upcall = upcall_next(udpif))) {
- upcall_destroy(upcall);
- }
-
- while ((fmb = flow_miss_batch_next(udpif))) {
- flow_miss_batch_destroy(fmb);
- }
+ udpif_set_threads(udpif, 0, 0);
+ udpif_flush();
- ovs_mutex_destroy(&udpif->drop_key_mutex);
- ovs_mutex_destroy(&udpif->upcall_mutex);
- ovs_mutex_destroy(&udpif->fmb_mutex);
+ list_remove(&udpif->list_node);
latch_destroy(&udpif->exit_latch);
- seq_destroy(udpif->wait_seq);
+ seq_destroy(udpif->reval_seq);
+ seq_destroy(udpif->dump_seq);
+ atomic_destroy(&udpif->flow_limit);
+ atomic_destroy(&udpif->n_flows);
+ atomic_destroy(&udpif->n_flows_timestamp);
+ ovs_mutex_destroy(&udpif->n_flows_mutex);
free(udpif);
}
-/* Tells 'udpif' to begin or stop handling flow misses depending on the value
- * of 'enable'. 'n_handlers' is the number of miss_handler threads to create.
- * Passing 'n_handlers' as zero is equivalent to passing 'enable' as false. */
+/* Tells 'udpif' how many threads it should use to handle upcalls. Disables
+ * all threads if 'n_handlers' and 'n_revalidators' is zero. 'udpif''s
+ * datapath handle must have packet reception enabled before starting threads.
+ */
void
-udpif_recv_set(struct udpif *udpif, size_t n_handlers, bool enable)
+udpif_set_threads(struct udpif *udpif, size_t n_handlers,
+ size_t n_revalidators)
{
- n_handlers = enable ? n_handlers : 0;
- n_handlers = MIN(n_handlers, 64);
-
/* Stop the old threads (if any). */
- if (udpif->handlers && udpif->n_handlers != n_handlers) {
+ if (udpif->handlers &&
+ (udpif->n_handlers != n_handlers
+ || udpif->n_revalidators != n_revalidators)) {
size_t i;
latch_set(&udpif->exit_latch);
- /* Wake the handlers so they can exit. */
for (i = 0; i < udpif->n_handlers; i++) {
struct handler *handler = &udpif->handlers[i];
ovs_mutex_lock(&handler->mutex);
xpthread_cond_signal(&handler->wake_cond);
ovs_mutex_unlock(&handler->mutex);
+ xpthread_join(handler->thread, NULL);
+ }
+
+ for (i = 0; i < udpif->n_revalidators; i++) {
+ struct revalidator *revalidator = &udpif->revalidators[i];
+
+ ovs_mutex_lock(&revalidator->mutex);
+ xpthread_cond_signal(&revalidator->wake_cond);
+ ovs_mutex_unlock(&revalidator->mutex);
+ xpthread_join(revalidator->thread, NULL);
}
+ xpthread_join(udpif->flow_dumper, NULL);
xpthread_join(udpif->dispatcher, NULL);
+
+ for (i = 0; i < udpif->n_revalidators; i++) {
+ struct revalidator *revalidator = &udpif->revalidators[i];
+ struct udpif_flow_dump *udump, *next_udump;
+ struct udpif_key *ukey, *next_ukey;
+
+ LIST_FOR_EACH_SAFE (udump, next_udump, list_node,
+ &revalidator->udumps) {
+ list_remove(&udump->list_node);
+ free(udump);
+ }
+
+ HMAP_FOR_EACH_SAFE (ukey, next_ukey, hmap_node,
+ &revalidator->ukeys) {
+ ukey_delete(revalidator, ukey);
+ }
+ hmap_destroy(&revalidator->ukeys);
+ ovs_mutex_destroy(&revalidator->mutex);
+
+ free(revalidator->name);
+ }
+
for (i = 0; i < udpif->n_handlers; i++) {
struct handler *handler = &udpif->handlers[i];
struct upcall *miss, *next;
- xpthread_join(handler->thread, NULL);
-
- ovs_mutex_lock(&handler->mutex);
LIST_FOR_EACH_SAFE (miss, next, list_node, &handler->upcalls) {
list_remove(&miss->list_node);
upcall_destroy(miss);
}
- ovs_mutex_unlock(&handler->mutex);
ovs_mutex_destroy(&handler->mutex);
xpthread_cond_destroy(&handler->wake_cond);
+ free(handler->name);
}
latch_poll(&udpif->exit_latch);
+ free(udpif->revalidators);
+ udpif->revalidators = NULL;
+ udpif->n_revalidators = 0;
+
free(udpif->handlers);
udpif->handlers = NULL;
udpif->n_handlers = 0;
size_t i;
udpif->n_handlers = n_handlers;
+ udpif->n_revalidators = n_revalidators;
+
udpif->handlers = xzalloc(udpif->n_handlers * sizeof *udpif->handlers);
for (i = 0; i < udpif->n_handlers; i++) {
struct handler *handler = &udpif->handlers[i];
handler->udpif = udpif;
list_init(&handler->upcalls);
+ handler->need_signal = false;
xpthread_cond_init(&handler->wake_cond, NULL);
- ovs_mutex_init(&handler->mutex, PTHREAD_MUTEX_NORMAL);
- xpthread_create(&handler->thread, NULL, udpif_miss_handler, handler);
+ ovs_mutex_init(&handler->mutex);
+ xpthread_create(&handler->thread, NULL, udpif_upcall_handler,
+ handler);
+ }
+
+ udpif->revalidators = xzalloc(udpif->n_revalidators
+ * sizeof *udpif->revalidators);
+ for (i = 0; i < udpif->n_revalidators; i++) {
+ struct revalidator *revalidator = &udpif->revalidators[i];
+
+ revalidator->udpif = udpif;
+ list_init(&revalidator->udumps);
+ hmap_init(&revalidator->ukeys);
+ ovs_mutex_init(&revalidator->mutex);
+ xpthread_cond_init(&revalidator->wake_cond, NULL);
+ xpthread_create(&revalidator->thread, NULL, udpif_revalidator,
+ revalidator);
}
xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, udpif);
+ xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper, udpif);
}
}
+/* Notifies 'udpif' that something changed which may render previous
+ * xlate_actions() results invalid. */
void
-udpif_run(struct udpif *udpif)
+udpif_revalidate(struct udpif *udpif)
{
- udpif->last_seq = seq_read(udpif->wait_seq);
+ seq_change(udpif->reval_seq);
+}
+
+/* Returns a seq which increments every time 'udpif' pulls stats from the
+ * datapath. Callers can use this to get a sense of when might be a good time
+ * to do periodic work which relies on relatively up to date statistics. */
+struct seq *
+udpif_dump_seq(struct udpif *udpif)
+{
+ return udpif->dump_seq;
}
void
-udpif_wait(struct udpif *udpif)
+udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
{
- ovs_mutex_lock(&udpif->drop_key_mutex);
- if (udpif->n_drop_keys) {
- poll_immediate_wake();
- }
- ovs_mutex_unlock(&udpif->drop_key_mutex);
+ size_t i;
- ovs_mutex_lock(&udpif->upcall_mutex);
- if (udpif->n_upcalls) {
- poll_immediate_wake();
- }
- ovs_mutex_unlock(&udpif->upcall_mutex);
+ simap_increase(usage, "dispatchers", 1);
+ simap_increase(usage, "flow_dumpers", 1);
- ovs_mutex_lock(&udpif->fmb_mutex);
- if (udpif->n_fmbs) {
- poll_immediate_wake();
+ simap_increase(usage, "handlers", udpif->n_handlers);
+ for (i = 0; i < udpif->n_handlers; i++) {
+ struct handler *handler = &udpif->handlers[i];
+ ovs_mutex_lock(&handler->mutex);
+ simap_increase(usage, "handler upcalls", handler->n_upcalls);
+ ovs_mutex_unlock(&handler->mutex);
}
- ovs_mutex_unlock(&udpif->fmb_mutex);
- seq_wait(udpif->wait_seq, udpif->last_seq);
+ simap_increase(usage, "revalidators", udpif->n_revalidators);
+ for (i = 0; i < udpif->n_revalidators; i++) {
+ struct revalidator *revalidator = &udpif->revalidators[i];
+ ovs_mutex_lock(&revalidator->mutex);
+ simap_increase(usage, "revalidator dumps", revalidator->n_udumps);
+
+ /* XXX: This isn't technically thread safe because the revalidator
+ * ukeys maps isn't protected by a mutex since it's per thread. */
+ simap_increase(usage, "revalidator keys",
+ hmap_count(&revalidator->ukeys));
+ ovs_mutex_unlock(&revalidator->mutex);
+ }
}
-/* Notifies 'udpif' that something changed which may render previous
- * xlate_actions() results invalid. */
+/* Removes all flows from all datapaths. */
void
-udpif_revalidate(struct udpif *udpif)
+udpif_flush(void)
{
- struct flow_miss_batch *fmb, *next_fmb;
- unsigned int junk;
-
- /* Since we remove each miss on revalidation, their statistics won't be
- * accounted to the appropriate 'facet's in the upper layer. In most
- * cases, this is alright because we've already pushed the stats to the
- * relevant rules. However, NetFlow requires absolute packet counts on
- * 'facet's which could now be incorrect. */
- ovs_mutex_lock(&udpif->fmb_mutex);
- atomic_add(&udpif->reval_seq, 1, &junk);
- LIST_FOR_EACH_SAFE (fmb, next_fmb, list_node, &udpif->fmbs) {
- list_remove(&fmb->list_node);
- flow_miss_batch_destroy(fmb);
- udpif->n_fmbs--;
- }
- ovs_mutex_unlock(&udpif->fmb_mutex);
- udpif_drop_key_clear(udpif);
-}
+ struct udpif *udpif;
-/* Retreives the next upcall which ofproto-dpif is responsible for handling.
- * The caller is responsible for destroying the returned upcall with
- * upcall_destroy(). */
-struct upcall *
-upcall_next(struct udpif *udpif)
-{
- struct upcall *next = NULL;
-
- ovs_mutex_lock(&udpif->upcall_mutex);
- if (udpif->n_upcalls) {
- udpif->n_upcalls--;
- next = CONTAINER_OF(list_pop_front(&udpif->upcalls), struct upcall,
- list_node);
+ LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+ dpif_flow_flush(udpif->dpif);
}
- ovs_mutex_unlock(&udpif->upcall_mutex);
- return next;
}
-
+\f
/* Destroys and deallocates 'upcall'. */
-void
+static void
upcall_destroy(struct upcall *upcall)
{
if (upcall) {
+ ofpbuf_uninit(&upcall->dpif_upcall.packet);
ofpbuf_uninit(&upcall->upcall_buf);
free(upcall);
}
}
-/* Retreives the next batch of processed flow misses for 'udpif' to install.
- * The caller is responsible for destroying it with flow_miss_batch_destroy().
- */
-struct flow_miss_batch *
-flow_miss_batch_next(struct udpif *udpif)
+static uint64_t
+udpif_get_n_flows(struct udpif *udpif)
{
- struct flow_miss_batch *next = NULL;
-
- ovs_mutex_lock(&udpif->fmb_mutex);
- if (udpif->n_fmbs) {
- udpif->n_fmbs--;
- next = CONTAINER_OF(list_pop_front(&udpif->fmbs),
- struct flow_miss_batch, list_node);
+ long long int time, now;
+ uint64_t flow_count;
+
+ now = time_msec();
+ atomic_read(&udpif->n_flows_timestamp, &time);
+ if (time < now - 100 && !ovs_mutex_trylock(&udpif->n_flows_mutex)) {
+ struct dpif_dp_stats stats;
+
+ atomic_store(&udpif->n_flows_timestamp, now);
+ dpif_get_dp_stats(udpif->dpif, &stats);
+ flow_count = stats.n_flows;
+ atomic_store(&udpif->n_flows, flow_count);
+ ovs_mutex_unlock(&udpif->n_flows_mutex);
+ } else {
+ atomic_read(&udpif->n_flows, &flow_count);
}
- ovs_mutex_unlock(&udpif->fmb_mutex);
- return next;
+ return flow_count;
}
-/* Destroys and deallocates 'fmb'. */
-void
-flow_miss_batch_destroy(struct flow_miss_batch *fmb)
+/* The dispatcher thread is responsible for receiving upcalls from the kernel,
+ * assigning them to a upcall_handler thread. */
+static void *
+udpif_dispatcher(void *arg)
{
- struct flow_miss *miss, *next;
-
- if (!fmb) {
- return;
- }
+ struct udpif *udpif = arg;
- HMAP_FOR_EACH_SAFE (miss, next, hmap_node, &fmb->misses) {
- hmap_remove(&fmb->misses, &miss->hmap_node);
- miss_destroy(miss);
+ set_subprogram_name("dispatcher");
+ while (!latch_is_set(&udpif->exit_latch)) {
+ recv_upcalls(udpif);
+ dpif_recv_wait(udpif->dpif);
+ latch_wait(&udpif->exit_latch);
+ poll_block();
}
- hmap_destroy(&fmb->misses);
- free(fmb);
+ return NULL;
}
-/* Retreives the next drop key which ofproto-dpif needs to process. The caller
- * is responsible for destroying it with drop_key_destroy(). */
-struct drop_key *
-drop_key_next(struct udpif *udpif)
+static void *
+udpif_flow_dumper(void *arg)
{
- struct drop_key *next = NULL;
+ struct udpif *udpif = arg;
- ovs_mutex_lock(&udpif->drop_key_mutex);
- if (udpif->n_drop_keys) {
- udpif->n_drop_keys--;
- next = CONTAINER_OF(list_pop_front(&udpif->drop_keys), struct drop_key,
- list_node);
- }
- ovs_mutex_unlock(&udpif->drop_key_mutex);
- return next;
-}
+ set_subprogram_name("flow_dumper");
+ while (!latch_is_set(&udpif->exit_latch)) {
+ const struct dpif_flow_stats *stats;
+ long long int start_time, duration;
+ const struct nlattr *key, *mask;
+ struct dpif_flow_dump dump;
+ size_t key_len, mask_len;
+ unsigned int flow_limit;
+ bool need_revalidate;
+ uint64_t reval_seq;
+ size_t n_flows, i;
+
+ reval_seq = seq_read(udpif->reval_seq);
+ need_revalidate = udpif->last_reval_seq != reval_seq;
+ udpif->last_reval_seq = reval_seq;
+
+ n_flows = udpif_get_n_flows(udpif);
+ udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
+ udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
+
+ start_time = time_msec();
+ dpif_flow_dump_start(&dump, udpif->dpif);
+ while (dpif_flow_dump_next(&dump, &key, &key_len, &mask, &mask_len,
+ NULL, NULL, &stats)
+ && !latch_is_set(&udpif->exit_latch)) {
+ struct udpif_flow_dump *udump = xmalloc(sizeof *udump);
+ struct revalidator *revalidator;
+
+ udump->key_hash = hash_bytes(key, key_len, udpif->secret);
+ memcpy(&udump->key_buf, key, key_len);
+ udump->key = (struct nlattr *) &udump->key_buf;
+ udump->key_len = key_len;
+
+ memcpy(&udump->mask_buf, mask, mask_len);
+ udump->mask = (struct nlattr *) &udump->mask_buf;
+ udump->mask_len = mask_len;
+
+ udump->stats = *stats;
+ udump->need_revalidate = need_revalidate;
+
+ revalidator = &udpif->revalidators[udump->key_hash
+ % udpif->n_revalidators];
+
+ ovs_mutex_lock(&revalidator->mutex);
+ while (revalidator->n_udumps >= REVALIDATE_MAX_BATCH * 3
+ && !latch_is_set(&udpif->exit_latch)) {
+ ovs_mutex_cond_wait(&revalidator->wake_cond,
+ &revalidator->mutex);
+ }
+ list_push_back(&revalidator->udumps, &udump->list_node);
+ revalidator->n_udumps++;
+ xpthread_cond_signal(&revalidator->wake_cond);
+ ovs_mutex_unlock(&revalidator->mutex);
+ }
+ dpif_flow_dump_done(&dump);
+
+ /* Let all the revalidators finish and garbage collect. */
+ seq_change(udpif->dump_seq);
+ for (i = 0; i < udpif->n_revalidators; i++) {
+ struct revalidator *revalidator = &udpif->revalidators[i];
+ ovs_mutex_lock(&revalidator->mutex);
+ xpthread_cond_signal(&revalidator->wake_cond);
+ ovs_mutex_unlock(&revalidator->mutex);
+ }
-/* Destorys and deallocates 'drop_key'. */
-void
-drop_key_destroy(struct drop_key *drop_key)
-{
- if (drop_key) {
- free(drop_key->key);
- free(drop_key);
- }
-}
+ for (i = 0; i < udpif->n_revalidators; i++) {
+ struct revalidator *revalidator = &udpif->revalidators[i];
-/* Clears all drop keys waiting to be processed by drop_key_next(). */
-void
-udpif_drop_key_clear(struct udpif *udpif)
-{
- struct drop_key *drop_key, *next;
+ ovs_mutex_lock(&revalidator->mutex);
+ while (revalidator->dump_seq != seq_read(udpif->dump_seq)
+ && !latch_is_set(&udpif->exit_latch)) {
+ ovs_mutex_cond_wait(&revalidator->wake_cond,
+ &revalidator->mutex);
+ }
+ ovs_mutex_unlock(&revalidator->mutex);
+ }
- ovs_mutex_lock(&udpif->drop_key_mutex);
- LIST_FOR_EACH_SAFE (drop_key, next, list_node, &udpif->drop_keys) {
- list_remove(&drop_key->list_node);
- drop_key_destroy(drop_key);
- udpif->n_drop_keys--;
- }
- ovs_mutex_unlock(&udpif->drop_key_mutex);
-}
-\f
-/* The dispatcher thread is responsible for receving upcalls from the kernel,
- * assigning the miss upcalls to a miss_handler thread, and assigning the more
- * complex ones to ofproto-dpif directly. */
-static void *
-udpif_dispatcher(void *arg)
-{
- struct udpif *udpif = arg;
+ duration = MAX(time_msec() - start_time, 1);
+ udpif->dump_duration = duration;
+ atomic_read(&udpif->flow_limit, &flow_limit);
+ if (duration > 2000) {
+ flow_limit /= duration / 1000;
+ } else if (duration > 1300) {
+ flow_limit = flow_limit * 3 / 4;
+ } else if (duration < 1000 && n_flows > 2000
+ && flow_limit < n_flows * 1000 / duration) {
+ flow_limit += 1000;
+ }
+ flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
+ atomic_store(&udpif->flow_limit, flow_limit);
- set_subprogram_name("dispatcher");
- while (!latch_is_set(&udpif->exit_latch)) {
- recv_upcalls(udpif);
- dpif_recv_wait(udpif->dpif);
+ if (duration > 2000) {
+ VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
+ duration);
+ }
+
+ poll_timer_wait_until(start_time + MIN(MAX_IDLE, 500));
+ seq_wait(udpif->reval_seq, udpif->last_reval_seq);
latch_wait(&udpif->exit_latch);
poll_block();
}
return NULL;
}
-/* The miss handler thread is responsible for processing miss upcalls retreived
+/* The miss handler thread is responsible for processing miss upcalls retrieved
* by the dispatcher thread. Once finished it passes the processed miss
* upcalls to ofproto-dpif where they're installed in the datapath. */
static void *
-udpif_miss_handler(void *arg)
+udpif_upcall_handler(void *arg)
{
- struct list misses = LIST_INITIALIZER(&misses);
struct handler *handler = arg;
- set_subprogram_name("miss_handler");
- for (;;) {
+ handler->name = xasprintf("handler_%u", ovsthread_id_self());
+ set_subprogram_name("%s", handler->name);
+
+ while (!latch_is_set(&handler->udpif->exit_latch)) {
+ struct list misses = LIST_INITIALIZER(&misses);
size_t i;
ovs_mutex_lock(&handler->mutex);
-
- if (latch_is_set(&handler->udpif->exit_latch)) {
- ovs_mutex_unlock(&handler->mutex);
- return NULL;
- }
-
if (!handler->n_upcalls) {
ovs_mutex_cond_wait(&handler->wake_cond, &handler->mutex);
}
}
ovs_mutex_unlock(&handler->mutex);
- handle_miss_upcalls(handler->udpif, &misses);
+ handle_upcalls(handler, &misses);
+
+ coverage_clear();
}
+
+ return NULL;
}
-\f
-static void
-miss_destroy(struct flow_miss *miss)
+
+static void *
+udpif_revalidator(void *arg)
{
- struct upcall *upcall, *next;
+ struct revalidator *revalidator = arg;
- LIST_FOR_EACH_SAFE (upcall, next, list_node, &miss->upcalls) {
- list_remove(&upcall->list_node);
- upcall_destroy(upcall);
+ revalidator->name = xasprintf("revalidator_%u", ovsthread_id_self());
+ set_subprogram_name("%s", revalidator->name);
+ for (;;) {
+ struct list udumps = LIST_INITIALIZER(&udumps);
+ struct udpif *udpif = revalidator->udpif;
+ size_t i;
+
+ ovs_mutex_lock(&revalidator->mutex);
+ if (latch_is_set(&udpif->exit_latch)) {
+ ovs_mutex_unlock(&revalidator->mutex);
+ return NULL;
+ }
+
+ if (!revalidator->n_udumps) {
+ if (revalidator->dump_seq != seq_read(udpif->dump_seq)) {
+ revalidator->dump_seq = seq_read(udpif->dump_seq);
+ revalidator_sweep(revalidator);
+ } else {
+ ovs_mutex_cond_wait(&revalidator->wake_cond,
+ &revalidator->mutex);
+ }
+ }
+
+ for (i = 0; i < REVALIDATE_MAX_BATCH && revalidator->n_udumps; i++) {
+ list_push_back(&udumps, list_pop_front(&revalidator->udumps));
+ revalidator->n_udumps--;
+ }
+
+ /* Wake up the flow dumper. */
+ xpthread_cond_signal(&revalidator->wake_cond);
+ ovs_mutex_unlock(&revalidator->mutex);
+
+ if (!list_is_empty(&udumps)) {
+ revalidate_udumps(revalidator, &udumps);
+ }
}
- xlate_out_uninit(&miss->xout);
-}
+ return NULL;
+}
+\f
static enum upcall_type
classify_upcall(const struct upcall *upcall)
{
userdata_len = nl_attr_get_size(dpif_upcall->userdata);
if (userdata_len < sizeof cookie.type
|| userdata_len > sizeof cookie) {
- VLOG_WARN_RL(&rl, "action upcall cookie has unexpected size %zu",
+ VLOG_WARN_RL(&rl, "action upcall cookie has unexpected size %"PRIuSIZE,
userdata_len);
return BAD_UPCALL;
}
return IPFIX_UPCALL;
} else {
VLOG_WARN_RL(&rl, "invalid user cookie of type %"PRIu16
- " and size %zu", cookie.type, userdata_len);
+ " and size %"PRIuSIZE, cookie.type, userdata_len);
return BAD_UPCALL;
}
}
static void
recv_upcalls(struct udpif *udpif)
{
- static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(60, 60);
+ int n;
+
for (;;) {
+ uint32_t hash = udpif->secret;
+ struct handler *handler;
struct upcall *upcall;
+ size_t n_bytes, left;
+ struct nlattr *nla;
int error;
upcall = xmalloc(sizeof *upcall);
error = dpif_recv(udpif->dpif, &upcall->dpif_upcall,
&upcall->upcall_buf);
if (error) {
- upcall_destroy(upcall);
+ /* upcall_destroy() can only be called on successfully received
+ * upcalls. */
+ ofpbuf_uninit(&upcall->upcall_buf);
+ free(upcall);
break;
}
- upcall->type = classify_upcall(upcall);
- if (upcall->type == BAD_UPCALL) {
- upcall_destroy(upcall);
- } else if (upcall->type == MISS_UPCALL) {
- struct dpif_upcall *dupcall = &upcall->dpif_upcall;
- uint32_t hash = udpif->secret;
- struct handler *handler;
- struct nlattr *nla;
- size_t n_bytes, left;
-
- n_bytes = 0;
- NL_ATTR_FOR_EACH (nla, left, dupcall->key, dupcall->key_len) {
- enum ovs_key_attr type = nl_attr_type(nla);
- if (type == OVS_KEY_ATTR_IN_PORT
- || type == OVS_KEY_ATTR_TCP
- || type == OVS_KEY_ATTR_UDP) {
- if (nl_attr_get_size(nla) == 4) {
- ovs_be32 attr = nl_attr_get_be32(nla);
- hash = mhash_add(hash, (OVS_FORCE uint32_t) attr);
- n_bytes += 4;
- } else {
- VLOG_WARN("Netlink attribute with incorrect size.");
- }
+ n_bytes = 0;
+ NL_ATTR_FOR_EACH (nla, left, upcall->dpif_upcall.key,
+ upcall->dpif_upcall.key_len) {
+ enum ovs_key_attr type = nl_attr_type(nla);
+ if (type == OVS_KEY_ATTR_IN_PORT
+ || type == OVS_KEY_ATTR_TCP
+ || type == OVS_KEY_ATTR_UDP) {
+ if (nl_attr_get_size(nla) == 4) {
+ hash = mhash_add(hash, nl_attr_get_u32(nla));
+ n_bytes += 4;
+ } else {
+ VLOG_WARN_RL(&rl,
+ "Netlink attribute with incorrect size.");
}
}
- hash = mhash_finish(hash, n_bytes);
-
- handler = &udpif->handlers[hash % udpif->n_handlers];
-
- ovs_mutex_lock(&handler->mutex);
- if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
- list_push_back(&handler->upcalls, &upcall->list_node);
- handler->n_upcalls++;
- xpthread_cond_signal(&handler->wake_cond);
- ovs_mutex_unlock(&handler->mutex);
- if (!VLOG_DROP_DBG(&rl)) {
- struct ds ds = DS_EMPTY_INITIALIZER;
-
- odp_flow_key_format(upcall->dpif_upcall.key,
- upcall->dpif_upcall.key_len,
- &ds);
- VLOG_DBG("dispatcher: miss enqueue (%s)", ds_cstr(&ds));
- ds_destroy(&ds);
- }
- } else {
- ovs_mutex_unlock(&handler->mutex);
- COVERAGE_INC(miss_queue_overflow);
- upcall_destroy(upcall);
- }
- } else {
- ovs_mutex_lock(&udpif->upcall_mutex);
- if (udpif->n_upcalls < MAX_QUEUE_LENGTH) {
- udpif->n_upcalls++;
- list_push_back(&udpif->upcalls, &upcall->list_node);
- ovs_mutex_unlock(&udpif->upcall_mutex);
- seq_change(udpif->wait_seq);
- } else {
- ovs_mutex_unlock(&udpif->upcall_mutex);
- COVERAGE_INC(upcall_queue_overflow);
- upcall_destroy(upcall);
+ }
+ hash = mhash_finish(hash, n_bytes);
+
+ handler = &udpif->handlers[hash % udpif->n_handlers];
+
+ ovs_mutex_lock(&handler->mutex);
+ if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
+ list_push_back(&handler->upcalls, &upcall->list_node);
+ if (handler->n_upcalls == 0) {
+ handler->need_signal = true;
}
+ handler->n_upcalls++;
+ if (handler->need_signal &&
+ handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
+ handler->need_signal = false;
+ xpthread_cond_signal(&handler->wake_cond);
+ }
+ ovs_mutex_unlock(&handler->mutex);
+ if (!VLOG_DROP_DBG(&rl)) {
+ struct ds ds = DS_EMPTY_INITIALIZER;
+
+ odp_flow_key_format(upcall->dpif_upcall.key,
+ upcall->dpif_upcall.key_len,
+ &ds);
+ VLOG_DBG("dispatcher: enqueue (%s)", ds_cstr(&ds));
+ ds_destroy(&ds);
+ }
+ } else {
+ ovs_mutex_unlock(&handler->mutex);
+ COVERAGE_INC(upcall_queue_overflow);
+ upcall_destroy(upcall);
}
}
+
+ for (n = 0; n < udpif->n_handlers; ++n) {
+ struct handler *handler = &udpif->handlers[n];
+
+ if (handler->need_signal) {
+ handler->need_signal = false;
+ ovs_mutex_lock(&handler->mutex);
+ xpthread_cond_signal(&handler->wake_cond);
+ ovs_mutex_unlock(&handler->mutex);
+ }
+ }
+}
+
+/* Calculates slow path actions for 'xout'. 'buf' must statically be
+ * initialized with at least 128 bytes of space. */
+static void
+compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
+ odp_port_t odp_in_port, struct ofpbuf *buf)
+{
+ union user_action_cookie cookie;
+ odp_port_t port;
+ uint32_t pid;
+
+ cookie.type = USER_ACTION_COOKIE_SLOW_PATH;
+ cookie.slow_path.unused = 0;
+ cookie.slow_path.reason = xout->slow;
+
+ port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
+ ? ODPP_NONE
+ : odp_in_port;
+ pid = dpif_port_get_pid(udpif->dpif, port);
+ odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);
}
static struct flow_miss *
return NULL;
}
-/* Executes flow miss 'miss'. May add any required datapath operations
- * to 'ops', incrementing '*n_ops' for each new op. */
static void
-execute_flow_miss(struct flow_miss *miss, struct dpif_op *ops, size_t *n_ops)
+handle_upcalls(struct handler *handler, struct list *upcalls)
{
- struct ofproto_dpif *ofproto = miss->ofproto;
- struct flow_wildcards wc;
- struct rule_dpif *rule;
- struct ofpbuf *packet;
- struct xlate_in xin;
+ struct hmap misses = HMAP_INITIALIZER(&misses);
+ struct udpif *udpif = handler->udpif;
- memset(&miss->stats, 0, sizeof miss->stats);
- miss->stats.used = time_msec();
- LIST_FOR_EACH (packet, list_node, &miss->packets) {
- miss->stats.tcp_flags |= packet_get_tcp_flags(packet, &miss->flow);
- miss->stats.n_bytes += packet->size;
- miss->stats.n_packets++;
- }
-
- flow_wildcards_init_catchall(&wc);
- rule_dpif_lookup(ofproto, &miss->flow, &wc, &rule);
- rule_credit_stats(rule, &miss->stats);
- xlate_in_init(&xin, ofproto, &miss->flow, rule, miss->stats.tcp_flags,
- NULL);
- xin.may_learn = true;
- xin.resubmit_stats = &miss->stats;
- xlate_actions(&xin, &miss->xout);
- flow_wildcards_or(&miss->xout.wc, &miss->xout.wc, &wc);
-
- if (rule->up.cr.priority == FAIL_OPEN_PRIORITY) {
- LIST_FOR_EACH (packet, list_node, &miss->packets) {
- struct ofputil_packet_in *pin;
-
- /* Extra-special case for fail-open mode.
- *
- * We are in fail-open mode and the packet matched the fail-open
- * rule, but we are connected to a controller too. We should send
- * the packet up to the controller in the hope that it will try to
- * set up a flow and thereby allow us to exit fail-open.
- *
- * See the top-level comment in fail-open.c for more information. */
- pin = xmalloc(sizeof(*pin));
- pin->packet = xmemdup(packet->data, packet->size);
- pin->packet_len = packet->size;
- pin->reason = OFPR_NO_MATCH;
- pin->controller_id = 0;
- pin->table_id = 0;
- pin->cookie = 0;
- pin->send_len = 0; /* Not used for flow table misses. */
- flow_get_metadata(&miss->flow, &pin->fmd);
- ofproto_dpif_send_packet_in(ofproto, pin);
+ struct flow_miss miss_buf[FLOW_MISS_MAX_BATCH];
+ struct dpif_op *opsp[FLOW_MISS_MAX_BATCH * 2];
+ struct dpif_op ops[FLOW_MISS_MAX_BATCH * 2];
+ struct flow_miss *miss, *next_miss;
+ struct upcall *upcall, *next;
+ size_t n_misses, n_ops, i;
+ unsigned int flow_limit;
+ bool fail_open, may_put;
+ enum upcall_type type;
+
+ atomic_read(&udpif->flow_limit, &flow_limit);
+ may_put = udpif_get_n_flows(udpif) < flow_limit;
+
+ /* Extract the flow from each upcall. Construct in 'misses' a hash table
+ * that maps each unique flow to a 'struct flow_miss'.
+ *
+ * Most commonly there is a single packet per flow_miss, but there are
+ * several reasons why there might be more than one, e.g.:
+ *
+ * - The dpif packet interface does not support TSO (or UFO, etc.), so a
+ * large packet sent to userspace is split into a sequence of smaller
+ * ones.
+ *
+ * - A stream of quickly arriving packets in an established "slow-pathed"
+ * flow.
+ *
+ * - Rarely, a stream of quickly arriving packets in a flow not yet
+ * established. (This is rare because most protocols do not send
+ * multiple back-to-back packets before receiving a reply from the
+ * other end of the connection, which gives OVS a chance to set up a
+ * datapath flow.)
+ */
+ n_misses = 0;
+ LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
+ struct dpif_upcall *dupcall = &upcall->dpif_upcall;
+ struct flow_miss *miss = &miss_buf[n_misses];
+ struct ofpbuf *packet = &dupcall->packet;
+ struct flow_miss *existing_miss;
+ struct ofproto_dpif *ofproto;
+ struct dpif_sflow *sflow;
+ struct dpif_ipfix *ipfix;
+ odp_port_t odp_in_port;
+ struct flow flow;
+ int error;
+
+ error = xlate_receive(udpif->backer, packet, dupcall->key,
+ dupcall->key_len, &flow,
+ &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
+ if (error) {
+ if (error == ENODEV) {
+ /* Received packet on datapath port for which we couldn't
+ * associate an ofproto. This can happen if a port is removed
+ * while traffic is being received. Print a rate-limited
+ * message in case it happens frequently. Install a drop flow
+ * so that future packets of the flow are inexpensively dropped
+ * in the kernel. */
+ VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
+ "port %"PRIu32, odp_in_port);
+ dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
+ dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
+ NULL);
+ }
+ list_remove(&upcall->list_node);
+ upcall_destroy(upcall);
+ continue;
}
+
+ type = classify_upcall(upcall);
+ if (type == MISS_UPCALL) {
+ uint32_t hash;
+
+ flow_extract(packet, flow.skb_priority, flow.pkt_mark,
+ &flow.tunnel, &flow.in_port, &miss->flow);
+
+ hash = flow_hash(&miss->flow, 0);
+ existing_miss = flow_miss_find(&misses, ofproto, &miss->flow,
+ hash);
+ if (!existing_miss) {
+ hmap_insert(&misses, &miss->hmap_node, hash);
+ miss->ofproto = ofproto;
+ miss->key = dupcall->key;
+ miss->key_len = dupcall->key_len;
+ miss->upcall_type = dupcall->type;
+ miss->stats.n_packets = 0;
+ miss->stats.n_bytes = 0;
+ miss->stats.used = time_msec();
+ miss->stats.tcp_flags = 0;
+ miss->odp_in_port = odp_in_port;
+ miss->put = false;
+
+ n_misses++;
+ } else {
+ miss = existing_miss;
+ }
+ miss->stats.tcp_flags |= packet_get_tcp_flags(packet, &miss->flow);
+ miss->stats.n_bytes += packet->size;
+ miss->stats.n_packets++;
+
+ upcall->flow_miss = miss;
+ continue;
+ }
+
+ switch (type) {
+ case SFLOW_UPCALL:
+ if (sflow) {
+ union user_action_cookie cookie;
+
+ memset(&cookie, 0, sizeof cookie);
+ memcpy(&cookie, nl_attr_get(dupcall->userdata),
+ sizeof cookie.sflow);
+ dpif_sflow_received(sflow, packet, &flow, odp_in_port,
+ &cookie);
+ }
+ break;
+ case IPFIX_UPCALL:
+ if (ipfix) {
+ dpif_ipfix_bridge_sample(ipfix, packet, &flow);
+ }
+ break;
+ case FLOW_SAMPLE_UPCALL:
+ if (ipfix) {
+ union user_action_cookie cookie;
+
+ memset(&cookie, 0, sizeof cookie);
+ memcpy(&cookie, nl_attr_get(dupcall->userdata),
+ sizeof cookie.flow_sample);
+
+ /* The flow reflects exactly the contents of the packet.
+ * Sample the packet using it. */
+ dpif_ipfix_flow_sample(ipfix, packet, &flow,
+ cookie.flow_sample.collector_set_id,
+ cookie.flow_sample.probability,
+ cookie.flow_sample.obs_domain_id,
+ cookie.flow_sample.obs_point_id);
+ }
+ break;
+ case BAD_UPCALL:
+ break;
+ case MISS_UPCALL:
+ OVS_NOT_REACHED();
+ }
+
+ dpif_ipfix_unref(ipfix);
+ dpif_sflow_unref(sflow);
+
+ list_remove(&upcall->list_node);
+ upcall_destroy(upcall);
+ }
+
+ /* Initialize each 'struct flow_miss's ->xout.
+ *
+ * We do this per-flow_miss rather than per-packet because, most commonly,
+ * all the packets in a flow can use the same translation.
+ *
+ * We can't do this in the previous loop because we need the TCP flags for
+ * all the packets in each miss. */
+ fail_open = false;
+ HMAP_FOR_EACH (miss, hmap_node, &misses) {
+ struct xlate_in xin;
+
+ xlate_in_init(&xin, miss->ofproto, &miss->flow, NULL,
+ miss->stats.tcp_flags, NULL);
+ xin.may_learn = true;
+
+ if (miss->upcall_type == DPIF_UC_MISS) {
+ xin.resubmit_stats = &miss->stats;
+ } else {
+ /* For non-miss upcalls, there's a flow in the datapath which this
+ * packet was accounted to. Presumably the revalidators will deal
+ * with pushing its stats eventually. */
+ }
+
+ xlate_actions(&xin, &miss->xout);
+ fail_open = fail_open || miss->xout.fail_open;
}
- if (miss->xout.slow) {
- LIST_FOR_EACH (packet, list_node, &miss->packets) {
+ /* Now handle the packets individually in order of arrival. In the common
+ * case each packet of a miss can share the same actions, but slow-pathed
+ * packets need to be translated individually:
+ *
+ * - For SLOW_CFM, SLOW_LACP, SLOW_STP, and SLOW_BFD, translation is what
+ * processes received packets for these protocols.
+ *
+ * - For SLOW_CONTROLLER, translation sends the packet to the OpenFlow
+ * controller.
+ *
+ * The loop fills 'ops' with an array of operations to execute in the
+ * datapath. */
+ n_ops = 0;
+ LIST_FOR_EACH (upcall, list_node, upcalls) {
+ struct flow_miss *miss = upcall->flow_miss;
+ struct ofpbuf *packet = &upcall->dpif_upcall.packet;
+ struct dpif_op *op;
+ ovs_be16 flow_vlan_tci;
+
+ /* Save a copy of flow.vlan_tci in case it is changed to
+ * generate proper mega flow masks for VLAN splinter flows. */
+ flow_vlan_tci = miss->flow.vlan_tci;
+
+ if (miss->xout.slow) {
struct xlate_in xin;
- xlate_in_init(&xin, miss->ofproto, &miss->flow, rule, 0, packet);
+ xlate_in_init(&xin, miss->ofproto, &miss->flow, NULL, 0, packet);
xlate_actions_for_side_effects(&xin);
}
- }
- rule_release(rule);
- if (miss->xout.odp_actions.size) {
- LIST_FOR_EACH (packet, list_node, &miss->packets) {
- struct dpif_op *op = &ops[*n_ops];
- struct dpif_execute *execute = &op->u.execute;
-
- if (miss->flow.in_port.ofp_port
- != vsp_realdev_to_vlandev(miss->ofproto,
- miss->flow.in_port.ofp_port,
- miss->flow.vlan_tci)) {
- /* This packet was received on a VLAN splinter port. We
- * added a VLAN to the packet to make the packet resemble
- * the flow, but the actions were composed assuming that
- * the packet contained no VLAN. So, we must remove the
- * VLAN header from the packet before trying to execute the
- * actions. */
+ if (miss->flow.in_port.ofp_port
+ != vsp_realdev_to_vlandev(miss->ofproto,
+ miss->flow.in_port.ofp_port,
+ miss->flow.vlan_tci)) {
+ /* This packet was received on a VLAN splinter port. We
+ * added a VLAN to the packet to make the packet resemble
+ * the flow, but the actions were composed assuming that
+ * the packet contained no VLAN. So, we must remove the
+ * VLAN header from the packet before trying to execute the
+ * actions. */
+ if (miss->xout.odp_actions.size) {
eth_pop_vlan(packet);
}
+ /* Remove the flow vlan tags inserted by vlan splinter logic
+ * to ensure megaflow masks generated match the data path flow. */
+ miss->flow.vlan_tci = 0;
+ }
+
+ /* Do not install a flow into the datapath if:
+ *
+ * - The datapath already has too many flows.
+ *
+ * - An earlier iteration of this loop already put the same flow.
+ *
+ * - We received this packet via some flow installed in the kernel
+ * already. */
+ if (may_put
+ && !miss->put
+ && upcall->dpif_upcall.type == DPIF_UC_MISS) {
+ struct ofpbuf mask;
+ bool megaflow;
+
+ miss->put = true;
+
+ atomic_read(&enable_megaflows, &megaflow);
+ ofpbuf_use_stack(&mask, &miss->mask_buf, sizeof miss->mask_buf);
+ if (megaflow) {
+ size_t max_mpls;
+
+ max_mpls = ofproto_dpif_get_max_mpls_depth(miss->ofproto);
+ odp_flow_key_from_mask(&mask, &miss->xout.wc.masks,
+ &miss->flow, UINT32_MAX, max_mpls);
+ }
+
+ op = &ops[n_ops++];
+ op->type = DPIF_OP_FLOW_PUT;
+ op->u.flow_put.flags = DPIF_FP_CREATE | DPIF_FP_MODIFY;
+ op->u.flow_put.key = miss->key;
+ op->u.flow_put.key_len = miss->key_len;
+ op->u.flow_put.mask = mask.data;
+ op->u.flow_put.mask_len = mask.size;
+ op->u.flow_put.stats = NULL;
+
+ if (!miss->xout.slow) {
+ op->u.flow_put.actions = miss->xout.odp_actions.data;
+ op->u.flow_put.actions_len = miss->xout.odp_actions.size;
+ } else {
+ struct ofpbuf buf;
+
+ ofpbuf_use_stack(&buf, miss->slow_path_buf,
+ sizeof miss->slow_path_buf);
+ compose_slow_path(udpif, &miss->xout, miss->odp_in_port, &buf);
+ op->u.flow_put.actions = buf.data;
+ op->u.flow_put.actions_len = buf.size;
+ }
+ }
+
+ /*
+ * The 'miss' may be shared by multiple upcalls. Restore
+ * the saved flow vlan_tci field before processing the next
+ * upcall. */
+ miss->flow.vlan_tci = flow_vlan_tci;
+
+ if (miss->xout.odp_actions.size) {
+
+ op = &ops[n_ops++];
op->type = DPIF_OP_EXECUTE;
- execute->key = miss->key;
- execute->key_len = miss->key_len;
- execute->packet = packet;
- execute->actions = miss->xout.odp_actions.data;
- execute->actions_len = miss->xout.odp_actions.size;
+ op->u.execute.packet = packet;
+ odp_key_to_pkt_metadata(miss->key, miss->key_len,
+ &op->u.execute.md);
+ op->u.execute.actions = miss->xout.odp_actions.data;
+ op->u.execute.actions_len = miss->xout.odp_actions.size;
+ op->u.execute.needs_help = (miss->xout.slow & SLOW_ACTION) != 0;
+ }
+ }
+
+ /* Special case for fail-open mode.
+ *
+ * If we are in fail-open mode, but we are connected to a controller too,
+ * then we should send the packet up to the controller in the hope that it
+ * will try to set up a flow and thereby allow us to exit fail-open.
+ *
+ * See the top-level comment in fail-open.c for more information.
+ *
+ * Copy packets before they are modified by execution. */
+ if (fail_open) {
+ LIST_FOR_EACH (upcall, list_node, upcalls) {
+ struct flow_miss *miss = upcall->flow_miss;
+ struct ofpbuf *packet = &upcall->dpif_upcall.packet;
+ struct ofproto_packet_in *pin;
+
+ pin = xmalloc(sizeof *pin);
+ pin->up.packet = xmemdup(packet->data, packet->size);
+ pin->up.packet_len = packet->size;
+ pin->up.reason = OFPR_NO_MATCH;
+ pin->up.table_id = 0;
+ pin->up.cookie = OVS_BE64_MAX;
+ flow_get_metadata(&miss->flow, &pin->up.fmd);
+ pin->send_len = 0; /* Not used for flow table misses. */
+ pin->generated_by_table_miss = false;
+ ofproto_dpif_send_packet_in(miss->ofproto, pin);
+ }
+ }
- (*n_ops)++;
+ /* Execute batch. */
+ for (i = 0; i < n_ops; i++) {
+ opsp[i] = &ops[i];
+ }
+ dpif_operate(udpif->dpif, opsp, n_ops);
+
+ HMAP_FOR_EACH_SAFE (miss, next_miss, hmap_node, &misses) {
+ hmap_remove(&misses, &miss->hmap_node);
+ xlate_out_uninit(&miss->xout);
+ }
+ hmap_destroy(&misses);
+
+ LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
+ list_remove(&upcall->list_node);
+ upcall_destroy(upcall);
+ }
+}
+
+static struct udpif_key *
+ukey_lookup(struct revalidator *revalidator, struct udpif_flow_dump *udump)
+{
+ struct udpif_key *ukey;
+
+ HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, udump->key_hash,
+ &revalidator->ukeys) {
+ if (ukey->key_len == udump->key_len
+ && !memcmp(ukey->key, udump->key, udump->key_len)) {
+ return ukey;
}
}
+ return NULL;
}
static void
-handle_miss_upcalls(struct udpif *udpif, struct list *upcalls)
+ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey)
{
- struct dpif_op *opsp[FLOW_MISS_MAX_BATCH];
- struct dpif_op ops[FLOW_MISS_MAX_BATCH];
- unsigned int old_reval_seq, new_reval_seq;
- struct upcall *upcall, *next;
- struct flow_miss_batch *fmb;
- size_t n_upcalls, n_ops, i;
- struct flow_miss *miss;
+ hmap_remove(&revalidator->ukeys, &ukey->hmap_node);
+ free(ukey);
+}
- atomic_read(&udpif->reval_seq, &old_reval_seq);
+static bool
+revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
+ struct udpif_key *ukey)
+{
+ struct ofpbuf xout_actions, *actions;
+ uint64_t slow_path_buf[128 / 8];
+ struct xlate_out xout, *xoutp;
+ struct flow flow, udump_mask;
+ struct ofproto_dpif *ofproto;
+ struct dpif_flow_stats push;
+ uint32_t *udump32, *xout32;
+ odp_port_t odp_in_port;
+ struct xlate_in xin;
+ int error;
+ size_t i;
+ bool ok;
+
+ ok = false;
+ xoutp = NULL;
+ actions = NULL;
+
+ /* If we don't need to revalidate, we can simply push the stats contained
+ * in the udump, otherwise we'll have to get the actions so we can check
+ * them. */
+ if (udump->need_revalidate) {
+ if (dpif_flow_get(udpif->dpif, ukey->key, ukey->key_len, &actions,
+ &udump->stats)) {
+ goto exit;
+ }
+ }
- /* Construct the to-do list.
- *
- * This just amounts to extracting the flow from each packet and sticking
- * the packets that have the same flow in the same "flow_miss" structure so
- * that we can process them together. */
- fmb = xmalloc(sizeof *fmb);
- hmap_init(&fmb->misses);
- n_upcalls = 0;
- LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
- struct dpif_upcall *dupcall = &upcall->dpif_upcall;
- struct flow_miss *miss = &fmb->miss_buf[n_upcalls];
- struct flow_miss *existing_miss;
- struct ofproto_dpif *ofproto;
- odp_port_t odp_in_port;
- struct flow flow;
- uint32_t hash;
- int error;
+ push.used = udump->stats.used;
+ push.tcp_flags = udump->stats.tcp_flags;
+ push.n_packets = udump->stats.n_packets > ukey->stats.n_packets
+ ? udump->stats.n_packets - ukey->stats.n_packets
+ : 0;
+ push.n_bytes = udump->stats.n_bytes > ukey->stats.n_bytes
+ ? udump->stats.n_bytes - ukey->stats.n_bytes
+ : 0;
+ ukey->stats = udump->stats;
+
+ if (!push.n_packets && !udump->need_revalidate) {
+ ok = true;
+ goto exit;
+ }
+
+ error = xlate_receive(udpif->backer, NULL, ukey->key, ukey->key_len, &flow,
+ &ofproto, NULL, NULL, NULL, &odp_in_port);
+ if (error) {
+ goto exit;
+ }
+
+ xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags, NULL);
+ xin.resubmit_stats = push.n_packets ? &push : NULL;
+ xin.may_learn = push.n_packets > 0;
+ xin.skip_wildcards = !udump->need_revalidate;
+ xlate_actions(&xin, &xout);
+ xoutp = &xout;
+
+ if (!udump->need_revalidate) {
+ ok = true;
+ goto exit;
+ }
- error = xlate_receive(udpif->backer, dupcall->packet, dupcall->key,
- dupcall->key_len, &flow, &miss->key_fitness,
- &ofproto, &odp_in_port);
-
- if (error == ENODEV) {
- struct drop_key *drop_key;
-
- /* Received packet on datapath port for which we couldn't
- * associate an ofproto. This can happen if a port is removed
- * while traffic is being received. Print a rate-limited message
- * in case it happens frequently. Install a drop flow so
- * that future packets of the flow are inexpensively dropped
- * in the kernel. */
- VLOG_INFO_RL(&rl, "received packet on unassociated datapath port "
- "%"PRIu32, odp_in_port);
-
- drop_key = xmalloc(sizeof *drop_key);
- drop_key->key = xmemdup(dupcall->key, dupcall->key_len);
- drop_key->key_len = dupcall->key_len;
-
- ovs_mutex_lock(&udpif->drop_key_mutex);
- if (udpif->n_drop_keys < MAX_QUEUE_LENGTH) {
- udpif->n_drop_keys++;
- list_push_back(&udpif->drop_keys, &drop_key->list_node);
- ovs_mutex_unlock(&udpif->drop_key_mutex);
- seq_change(udpif->wait_seq);
+ if (!xout.slow) {
+ ofpbuf_use_const(&xout_actions, xout.odp_actions.data,
+ xout.odp_actions.size);
+ } else {
+ ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);
+ compose_slow_path(udpif, &xout, odp_in_port, &xout_actions);
+ }
+
+ if (!ofpbuf_equal(&xout_actions, actions)) {
+ goto exit;
+ }
+
+ if (odp_flow_key_to_mask(udump->mask, udump->mask_len, &udump_mask, &flow)
+ == ODP_FIT_ERROR) {
+ goto exit;
+ }
+
+ /* Since the kernel is free to ignore wildcarded bits in the mask, we can't
+ * directly check that the masks are the same. Instead we check that the
+ * mask in the kernel is more specific i.e. less wildcarded, than what
+ * we've calculated here. This guarantees we don't catch any packets we
+ * shouldn't with the megaflow. */
+ udump32 = (uint32_t *) &udump_mask;
+ xout32 = (uint32_t *) &xout.wc.masks;
+ for (i = 0; i < FLOW_U32S; i++) {
+ if ((udump32[i] | xout32[i]) != udump32[i]) {
+ goto exit;
+ }
+ }
+ ok = true;
+
+exit:
+ ofpbuf_delete(actions);
+ xlate_out_uninit(xoutp);
+ return ok;
+}
+
+static void
+revalidate_udumps(struct revalidator *revalidator, struct list *udumps)
+{
+ struct udpif *udpif = revalidator->udpif;
+
+ struct {
+ struct dpif_flow_stats ukey_stats; /* Stats stored in the ukey. */
+ struct dpif_flow_stats stats; /* Stats for 'op'. */
+ struct dpif_op op; /* Flow del operation. */
+ } ops[REVALIDATE_MAX_BATCH];
+
+ struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
+ struct udpif_flow_dump *udump, *next_udump;
+ size_t n_ops, i, n_flows;
+ unsigned int flow_limit;
+ long long int max_idle;
+ bool must_del;
+
+ atomic_read(&udpif->flow_limit, &flow_limit);
+
+ n_flows = udpif_get_n_flows(udpif);
+
+ must_del = false;
+ max_idle = MAX_IDLE;
+ if (n_flows > flow_limit) {
+ must_del = n_flows > 2 * flow_limit;
+ max_idle = 100;
+ }
+
+ n_ops = 0;
+ LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) {
+ long long int used, now;
+ struct udpif_key *ukey;
+
+ now = time_msec();
+ ukey = ukey_lookup(revalidator, udump);
+
+ used = udump->stats.used;
+ if (!used && ukey) {
+ used = ukey->created;
+ }
+
+ if (must_del || (used && used < now - max_idle)) {
+ struct dpif_flow_stats *ukey_stats = &ops[n_ops].ukey_stats;
+ struct dpif_op *op = &ops[n_ops].op;
+
+ op->type = DPIF_OP_FLOW_DEL;
+ op->u.flow_del.key = udump->key;
+ op->u.flow_del.key_len = udump->key_len;
+ op->u.flow_del.stats = &ops[n_ops].stats;
+ n_ops++;
+
+ if (ukey) {
+ *ukey_stats = ukey->stats;
+ ukey_delete(revalidator, ukey);
} else {
- ovs_mutex_unlock(&udpif->drop_key_mutex);
- COVERAGE_INC(drop_queue_overflow);
- drop_key_destroy(drop_key);
+ memset(ukey_stats, 0, sizeof *ukey_stats);
}
- continue;
- } else if (error) {
+
continue;
}
- flow_extract(dupcall->packet, flow.skb_priority, flow.pkt_mark,
- &flow.tunnel, &flow.in_port, &miss->flow);
+ if (!ukey) {
+ ukey = xmalloc(sizeof *ukey);
- /* Add other packets to a to-do list. */
- hash = flow_hash(&miss->flow, 0);
- existing_miss = flow_miss_find(&fmb->misses, ofproto, &miss->flow, hash);
- if (!existing_miss) {
- hmap_insert(&fmb->misses, &miss->hmap_node, hash);
- miss->ofproto = ofproto;
- miss->key = dupcall->key;
- miss->key_len = dupcall->key_len;
- miss->upcall_type = dupcall->type;
- list_init(&miss->packets);
- list_init(&miss->upcalls);
+ ukey->key = (struct nlattr *) &ukey->key_buf;
+ memcpy(ukey->key, udump->key, udump->key_len);
+ ukey->key_len = udump->key_len;
- n_upcalls++;
- } else {
- miss = existing_miss;
+ ukey->created = used ? used : now;
+ memset(&ukey->stats, 0, sizeof ukey->stats);
+
+ ukey->mark = false;
+
+ hmap_insert(&revalidator->ukeys, &ukey->hmap_node,
+ udump->key_hash);
}
- list_push_back(&miss->packets, &dupcall->packet->list_node);
+ ukey->mark = true;
- list_remove(&upcall->list_node);
- list_push_back(&miss->upcalls, &upcall->list_node);
- }
+ if (!revalidate_ukey(udpif, udump, ukey)) {
+ dpif_flow_del(udpif->dpif, udump->key, udump->key_len, NULL);
+ ukey_delete(revalidator, ukey);
+ }
- LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
- list_remove(&upcall->list_node);
- upcall_destroy(upcall);
+ list_remove(&udump->list_node);
+ free(udump);
}
- /* Process each element in the to-do list, constructing the set of
- * operations to batch. */
- n_ops = 0;
- HMAP_FOR_EACH (miss, hmap_node, &fmb->misses) {
- execute_flow_miss(miss, ops, &n_ops);
+ for (i = 0; i < n_ops; i++) {
+ opsp[i] = &ops[i].op;
}
- ovs_assert(n_ops <= ARRAY_SIZE(ops));
+ dpif_operate(udpif->dpif, opsp, n_ops);
- /* Execute batch. */
for (i = 0; i < n_ops; i++) {
- opsp[i] = &ops[i];
+ struct dpif_flow_stats push, *stats, *ukey_stats;
+
+ ukey_stats = &ops[i].ukey_stats;
+ stats = ops[i].op.u.flow_del.stats;
+ push.used = MAX(stats->used, ukey_stats->used);
+ push.tcp_flags = stats->tcp_flags | ukey_stats->tcp_flags;
+ push.n_packets = stats->n_packets - ukey_stats->n_packets;
+ push.n_bytes = stats->n_bytes - ukey_stats->n_bytes;
+
+ if (push.n_packets || netflow_exists()) {
+ struct ofproto_dpif *ofproto;
+ struct netflow *netflow;
+ struct flow flow;
+
+ if (!xlate_receive(udpif->backer, NULL, ops[i].op.u.flow_del.key,
+ ops[i].op.u.flow_del.key_len, &flow,
+ &ofproto, NULL, NULL, &netflow, NULL)) {
+ struct xlate_in xin;
+
+ xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags,
+ NULL);
+ xin.resubmit_stats = push.n_packets ? &push : NULL;
+ xin.may_learn = push.n_packets > 0;
+ xin.skip_wildcards = true;
+ xlate_actions_for_side_effects(&xin);
+
+ if (netflow) {
+ netflow_expire(netflow, &flow);
+ netflow_flow_clear(netflow, &flow);
+ netflow_unref(netflow);
+ }
+ }
+ }
}
- dpif_operate(udpif->dpif, opsp, n_ops);
- ovs_mutex_lock(&udpif->fmb_mutex);
- atomic_read(&udpif->reval_seq, &new_reval_seq);
- if (old_reval_seq != new_reval_seq) {
- /* udpif_revalidate() was called as we were calculating the actions.
- * To be safe, we need to assume all the misses need revalidation. */
- ovs_mutex_unlock(&udpif->fmb_mutex);
- flow_miss_batch_destroy(fmb);
- } else if (udpif->n_fmbs < MAX_QUEUE_LENGTH) {
- udpif->n_fmbs++;
- list_push_back(&udpif->fmbs, &fmb->list_node);
- ovs_mutex_unlock(&udpif->fmb_mutex);
- seq_change(udpif->wait_seq);
- } else {
- COVERAGE_INC(fmb_queue_overflow);
- ovs_mutex_unlock(&udpif->fmb_mutex);
- flow_miss_batch_destroy(fmb);
+ LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) {
+ list_remove(&udump->list_node);
+ free(udump);
+ }
+}
+
+static void
+revalidator_sweep(struct revalidator *revalidator)
+{
+ struct udpif_key *ukey, *next;
+
+ HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, &revalidator->ukeys) {
+ if (ukey->mark) {
+ ukey->mark = false;
+ } else {
+ ukey_delete(revalidator, ukey);
+ }
+ }
+}
+\f
+static void
+upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
+{
+ struct ds ds = DS_EMPTY_INITIALIZER;
+ struct udpif *udpif;
+
+ LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+ unsigned int flow_limit;
+ size_t i;
+
+ atomic_read(&udpif->flow_limit, &flow_limit);
+
+ ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
+ ds_put_format(&ds, "\tflows : (current %"PRIu64")"
+ " (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
+ udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
+ ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
+
+ ds_put_char(&ds, '\n');
+ for (i = 0; i < udpif->n_handlers; i++) {
+ struct handler *handler = &udpif->handlers[i];
+
+ ovs_mutex_lock(&handler->mutex);
+ ds_put_format(&ds, "\t%s: (upcall queue %"PRIuSIZE")\n",
+ handler->name, handler->n_upcalls);
+ ovs_mutex_unlock(&handler->mutex);
+ }
+
+ ds_put_char(&ds, '\n');
+ for (i = 0; i < n_revalidators; i++) {
+ struct revalidator *revalidator = &udpif->revalidators[i];
+
+ /* XXX: The result of hmap_count(&revalidator->ukeys) may not be
+ * accurate because it's not protected by the revalidator mutex. */
+ ovs_mutex_lock(&revalidator->mutex);
+ ds_put_format(&ds, "\t%s: (dump queue %"PRIuSIZE") (keys %"PRIuSIZE
+ ")\n", revalidator->name, revalidator->n_udumps,
+ hmap_count(&revalidator->ukeys));
+ ovs_mutex_unlock(&revalidator->mutex);
+ }
+ }
+
+ unixctl_command_reply(conn, ds_cstr(&ds));
+ ds_destroy(&ds);
+}
+
+/* Disable using the megaflows.
+ *
+ * This command is only needed for advanced debugging, so it's not
+ * documented in the man page. */
+static void
+upcall_unixctl_disable_megaflows(struct unixctl_conn *conn,
+ int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED,
+ void *aux OVS_UNUSED)
+{
+ atomic_store(&enable_megaflows, false);
+ udpif_flush();
+ unixctl_command_reply(conn, "megaflows disabled");
+}
+
+/* Re-enable using megaflows.
+ *
+ * This command is only needed for advanced debugging, so it's not
+ * documented in the man page. */
+static void
+upcall_unixctl_enable_megaflows(struct unixctl_conn *conn,
+ int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED,
+ void *aux OVS_UNUSED)
+{
+ atomic_store(&enable_megaflows, true);
+ udpif_flush();
+ unixctl_command_reply(conn, "megaflows enabled");
+}
+
+/* Set the flow limit.
+ *
+ * This command is only needed for advanced debugging, so it's not
+ * documented in the man page. */
+static void
+upcall_unixctl_set_flow_limit(struct unixctl_conn *conn,
+ int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED,
+ void *aux OVS_UNUSED)
+{
+ struct ds ds = DS_EMPTY_INITIALIZER;
+ struct udpif *udpif;
+ unsigned int flow_limit = atoi(argv[1]);
+
+ LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+ atomic_store(&udpif->flow_limit, flow_limit);
}
+ ds_put_format(&ds, "set flow_limit to %u\n", flow_limit);
+ unixctl_command_reply(conn, ds_cstr(&ds));
+ ds_destroy(&ds);
}