#include "dynamic-string.h"
#include "dpif.h"
#include "fail-open.h"
+#include "guarded-list.h"
#include "latch.h"
#include "seq.h"
#include "list.h"
COVERAGE_DEFINE(drop_queue_overflow);
COVERAGE_DEFINE(miss_queue_overflow);
COVERAGE_DEFINE(fmb_queue_overflow);
+COVERAGE_DEFINE(fmb_queue_revalidated);
/* A thread that processes each upcall handed to it by the dispatcher thread,
* forwards the upcall's packet, and then queues it to the main ofproto_dpif
struct list upcalls OVS_GUARDED;
size_t n_upcalls OVS_GUARDED;
+ size_t n_new_upcalls; /* Only changed by the dispatcher. */
+
pthread_cond_t wake_cond; /* Wakes 'thread' while holding
'mutex'. */
};
struct handler *handlers; /* Miss handlers. */
size_t n_handlers;
- /* Atomic queue of unprocessed drop keys. */
- struct ovs_mutex drop_key_mutex;
- struct list drop_keys OVS_GUARDED;
- size_t n_drop_keys OVS_GUARDED;
-
- /* Atomic queue of special upcalls for ofproto-dpif to process. */
- struct ovs_mutex upcall_mutex;
- struct list upcalls OVS_GUARDED;
- size_t n_upcalls OVS_GUARDED;
-
- /* Atomic queue of flow_miss_batches. */
- struct ovs_mutex fmb_mutex;
- struct list fmbs OVS_GUARDED;
- size_t n_fmbs OVS_GUARDED;
+ /* Queues to pass up to ofproto-dpif. */
+ struct guarded_list drop_keys; /* "struct drop key"s. */
+ struct guarded_list upcalls; /* "struct upcall"s. */
+ struct guarded_list fmbs; /* "struct flow_miss_batch"es. */
/* Number of times udpif_revalidate() has been called. */
atomic_uint reval_seq;
struct seq *wait_seq;
- uint64_t last_seq;
struct latch exit_latch; /* Tells child threads to exit. */
};
udpif->secret = random_uint32();
udpif->wait_seq = seq_create();
latch_init(&udpif->exit_latch);
- list_init(&udpif->drop_keys);
- list_init(&udpif->upcalls);
- list_init(&udpif->fmbs);
+ guarded_list_init(&udpif->drop_keys);
+ guarded_list_init(&udpif->upcalls);
+ guarded_list_init(&udpif->fmbs);
atomic_init(&udpif->reval_seq, 0);
- ovs_mutex_init(&udpif->drop_key_mutex);
- ovs_mutex_init(&udpif->upcall_mutex);
- ovs_mutex_init(&udpif->fmb_mutex);
return udpif;
}
flow_miss_batch_destroy(fmb);
}
- ovs_mutex_destroy(&udpif->drop_key_mutex);
- ovs_mutex_destroy(&udpif->upcall_mutex);
- ovs_mutex_destroy(&udpif->fmb_mutex);
+ guarded_list_destroy(&udpif->drop_keys);
+ guarded_list_destroy(&udpif->upcalls);
+ guarded_list_destroy(&udpif->fmbs);
latch_destroy(&udpif->exit_latch);
seq_destroy(udpif->wait_seq);
free(udpif);
}
}
-void
-udpif_run(struct udpif *udpif)
-{
- udpif->last_seq = seq_read(udpif->wait_seq);
-}
-
void
udpif_wait(struct udpif *udpif)
{
- ovs_mutex_lock(&udpif->drop_key_mutex);
- if (udpif->n_drop_keys) {
- poll_immediate_wake();
- }
- ovs_mutex_unlock(&udpif->drop_key_mutex);
-
- ovs_mutex_lock(&udpif->upcall_mutex);
- if (udpif->n_upcalls) {
- poll_immediate_wake();
- }
- ovs_mutex_unlock(&udpif->upcall_mutex);
-
- ovs_mutex_lock(&udpif->fmb_mutex);
- if (udpif->n_fmbs) {
+ uint64_t seq = seq_read(udpif->wait_seq);
+ if (!guarded_list_is_empty(&udpif->drop_keys) ||
+ !guarded_list_is_empty(&udpif->upcalls) ||
+ !guarded_list_is_empty(&udpif->fmbs)) {
poll_immediate_wake();
+ } else {
+ seq_wait(udpif->wait_seq, seq);
}
- ovs_mutex_unlock(&udpif->fmb_mutex);
-
- seq_wait(udpif->wait_seq, udpif->last_seq);
}
/* Notifies 'udpif' that something changed which may render previous
{
struct flow_miss_batch *fmb, *next_fmb;
unsigned int junk;
+ struct list fmbs;
/* Since we remove each miss on revalidation, their statistics won't be
* accounted to the appropriate 'facet's in the upper layer. In most
* cases, this is alright because we've already pushed the stats to the
* relevant rules. However, NetFlow requires absolute packet counts on
* 'facet's which could now be incorrect. */
- ovs_mutex_lock(&udpif->fmb_mutex);
atomic_add(&udpif->reval_seq, 1, &junk);
- LIST_FOR_EACH_SAFE (fmb, next_fmb, list_node, &udpif->fmbs) {
+
+ guarded_list_pop_all(&udpif->fmbs, &fmbs);
+ LIST_FOR_EACH_SAFE (fmb, next_fmb, list_node, &fmbs) {
list_remove(&fmb->list_node);
flow_miss_batch_destroy(fmb);
- udpif->n_fmbs--;
}
- ovs_mutex_unlock(&udpif->fmb_mutex);
+
udpif_drop_key_clear(udpif);
}
struct upcall *
upcall_next(struct udpif *udpif)
{
- struct upcall *next = NULL;
-
- ovs_mutex_lock(&udpif->upcall_mutex);
- if (udpif->n_upcalls) {
- udpif->n_upcalls--;
- next = CONTAINER_OF(list_pop_front(&udpif->upcalls), struct upcall,
- list_node);
- }
- ovs_mutex_unlock(&udpif->upcall_mutex);
- return next;
+ struct list *next = guarded_list_pop_front(&udpif->upcalls);
+ return next ? CONTAINER_OF(next, struct upcall, list_node) : NULL;
}
/* Destroys and deallocates 'upcall'. */
struct flow_miss_batch *
flow_miss_batch_next(struct udpif *udpif)
{
- struct flow_miss_batch *next = NULL;
+ int i;
+
+ for (i = 0; i < 50; i++) {
+ struct flow_miss_batch *next;
+ unsigned int reval_seq;
+ struct list *next_node;
+
+ next_node = guarded_list_pop_front(&udpif->fmbs);
+ if (!next_node) {
+ break;
+ }
+
+ next = CONTAINER_OF(next_node, struct flow_miss_batch, list_node);
+ atomic_read(&udpif->reval_seq, &reval_seq);
+ if (next->reval_seq == reval_seq) {
+ return next;
+ }
- ovs_mutex_lock(&udpif->fmb_mutex);
- if (udpif->n_fmbs) {
- udpif->n_fmbs--;
- next = CONTAINER_OF(list_pop_front(&udpif->fmbs),
- struct flow_miss_batch, list_node);
+ flow_miss_batch_destroy(next);
}
- ovs_mutex_unlock(&udpif->fmb_mutex);
- return next;
+
+ return NULL;
}
/* Destroys and deallocates 'fmb'. */
struct drop_key *
drop_key_next(struct udpif *udpif)
{
- struct drop_key *next = NULL;
-
- ovs_mutex_lock(&udpif->drop_key_mutex);
- if (udpif->n_drop_keys) {
- udpif->n_drop_keys--;
- next = CONTAINER_OF(list_pop_front(&udpif->drop_keys), struct drop_key,
- list_node);
- }
- ovs_mutex_unlock(&udpif->drop_key_mutex);
- return next;
+ struct list *next = guarded_list_pop_front(&udpif->drop_keys);
+ return next ? CONTAINER_OF(next, struct drop_key, list_node) : NULL;
}
/* Destorys and deallocates 'drop_key'. */
udpif_drop_key_clear(struct udpif *udpif)
{
struct drop_key *drop_key, *next;
+ struct list list;
- ovs_mutex_lock(&udpif->drop_key_mutex);
- LIST_FOR_EACH_SAFE (drop_key, next, list_node, &udpif->drop_keys) {
+ guarded_list_pop_all(&udpif->drop_keys, &list);
+ LIST_FOR_EACH_SAFE (drop_key, next, list_node, &list) {
list_remove(&drop_key->list_node);
drop_key_destroy(drop_key);
- udpif->n_drop_keys--;
}
- ovs_mutex_unlock(&udpif->drop_key_mutex);
}
\f
/* The dispatcher thread is responsible for receving upcalls from the kernel,
recv_upcalls(struct udpif *udpif)
{
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(60, 60);
+ size_t n_udpif_new_upcalls = 0;
+ struct handler *handler;
+ int n;
+
for (;;) {
struct upcall *upcall;
int error;
} else if (upcall->type == MISS_UPCALL) {
struct dpif_upcall *dupcall = &upcall->dpif_upcall;
uint32_t hash = udpif->secret;
- struct handler *handler;
struct nlattr *nla;
size_t n_bytes, left;
ovs_mutex_lock(&handler->mutex);
if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
list_push_back(&handler->upcalls, &upcall->list_node);
- handler->n_upcalls++;
- xpthread_cond_signal(&handler->wake_cond);
+ handler->n_new_upcalls = ++handler->n_upcalls;
+
+ if (handler->n_new_upcalls >= FLOW_MISS_MAX_BATCH) {
+ xpthread_cond_signal(&handler->wake_cond);
+ }
ovs_mutex_unlock(&handler->mutex);
if (!VLOG_DROP_DBG(&rl)) {
struct ds ds = DS_EMPTY_INITIALIZER;
upcall_destroy(upcall);
}
} else {
- ovs_mutex_lock(&udpif->upcall_mutex);
- if (udpif->n_upcalls < MAX_QUEUE_LENGTH) {
- udpif->n_upcalls++;
- list_push_back(&udpif->upcalls, &upcall->list_node);
- ovs_mutex_unlock(&udpif->upcall_mutex);
- seq_change(udpif->wait_seq);
+ size_t len;
+
+ len = guarded_list_push_back(&udpif->upcalls, &upcall->list_node,
+ MAX_QUEUE_LENGTH);
+ if (len > 0) {
+ n_udpif_new_upcalls = len;
+ if (n_udpif_new_upcalls >= FLOW_MISS_MAX_BATCH) {
+ seq_change(udpif->wait_seq);
+ }
} else {
- ovs_mutex_unlock(&udpif->upcall_mutex);
COVERAGE_INC(upcall_queue_overflow);
upcall_destroy(upcall);
}
}
}
+ for (n = 0; n < udpif->n_handlers; ++n) {
+ handler = &udpif->handlers[n];
+ if (handler->n_new_upcalls) {
+ handler->n_new_upcalls = 0;
+ ovs_mutex_lock(&handler->mutex);
+ xpthread_cond_signal(&handler->wake_cond);
+ ovs_mutex_unlock(&handler->mutex);
+ }
+ }
+ if (n_udpif_new_upcalls) {
+ seq_change(udpif->wait_seq);
+ }
}
static struct flow_miss *
flow_wildcards_init_catchall(&wc);
rule_dpif_lookup(ofproto, &miss->flow, &wc, &rule);
- rule_credit_stats(rule, &miss->stats);
+ rule_dpif_credit_stats(rule, &miss->stats);
xlate_in_init(&xin, ofproto, &miss->flow, rule, miss->stats.tcp_flags,
NULL);
xin.may_learn = true;
xlate_actions(&xin, &miss->xout);
flow_wildcards_or(&miss->xout.wc, &miss->xout.wc, &wc);
- if (rule->up.cr.priority == FAIL_OPEN_PRIORITY) {
+ if (rule_dpif_fail_open(rule)) {
LIST_FOR_EACH (packet, list_node, &miss->packets) {
struct ofputil_packet_in *pin;
xlate_actions_for_side_effects(&xin);
}
}
- rule_release(rule);
+ rule_dpif_unref(rule);
if (miss->xout.odp_actions.size) {
LIST_FOR_EACH (packet, list_node, &miss->packets) {
{
struct dpif_op *opsp[FLOW_MISS_MAX_BATCH];
struct dpif_op ops[FLOW_MISS_MAX_BATCH];
- unsigned int old_reval_seq, new_reval_seq;
struct upcall *upcall, *next;
struct flow_miss_batch *fmb;
size_t n_upcalls, n_ops, i;
struct flow_miss *miss;
-
- atomic_read(&udpif->reval_seq, &old_reval_seq);
+ unsigned int reval_seq;
/* Construct the to-do list.
*
* the packets that have the same flow in the same "flow_miss" structure so
* that we can process them together. */
fmb = xmalloc(sizeof *fmb);
+ atomic_read(&udpif->reval_seq, &fmb->reval_seq);
hmap_init(&fmb->misses);
n_upcalls = 0;
LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
drop_key->key = xmemdup(dupcall->key, dupcall->key_len);
drop_key->key_len = dupcall->key_len;
- ovs_mutex_lock(&udpif->drop_key_mutex);
- if (udpif->n_drop_keys < MAX_QUEUE_LENGTH) {
- udpif->n_drop_keys++;
- list_push_back(&udpif->drop_keys, &drop_key->list_node);
- ovs_mutex_unlock(&udpif->drop_key_mutex);
+ if (guarded_list_push_back(&udpif->drop_keys, &drop_key->list_node,
+ MAX_QUEUE_LENGTH)) {
seq_change(udpif->wait_seq);
} else {
- ovs_mutex_unlock(&udpif->drop_key_mutex);
COVERAGE_INC(drop_queue_overflow);
drop_key_destroy(drop_key);
}
}
dpif_operate(udpif->dpif, opsp, n_ops);
- ovs_mutex_lock(&udpif->fmb_mutex);
- atomic_read(&udpif->reval_seq, &new_reval_seq);
- if (old_reval_seq != new_reval_seq) {
- /* udpif_revalidate() was called as we were calculating the actions.
- * To be safe, we need to assume all the misses need revalidation. */
- ovs_mutex_unlock(&udpif->fmb_mutex);
+ atomic_read(&udpif->reval_seq, &reval_seq);
+ if (reval_seq != fmb->reval_seq) {
+ COVERAGE_INC(fmb_queue_revalidated);
flow_miss_batch_destroy(fmb);
- } else if (udpif->n_fmbs < MAX_QUEUE_LENGTH) {
- udpif->n_fmbs++;
- list_push_back(&udpif->fmbs, &fmb->list_node);
- ovs_mutex_unlock(&udpif->fmb_mutex);
- seq_change(udpif->wait_seq);
- } else {
+ } else if (!guarded_list_push_back(&udpif->fmbs, &fmb->list_node,
+ MAX_QUEUE_LENGTH)) {
COVERAGE_INC(fmb_queue_overflow);
- ovs_mutex_unlock(&udpif->fmb_mutex);
flow_miss_batch_destroy(fmb);
+ } else {
+ seq_change(udpif->wait_seq);
}
}