#include "connmgr.h"
#include "coverage.h"
-#include "dynamic-string.h"
#include "dpif.h"
+#include "dynamic-string.h"
#include "fail-open.h"
#include "guarded-list.h"
#include "latch.h"
-#include "seq.h"
#include "list.h"
#include "netlink.h"
#include "ofpbuf.h"
#include "ofproto-dpif-sflow.h"
#include "packets.h"
#include "poll-loop.h"
+#include "seq.h"
+#include "unixctl.h"
#include "vlog.h"
#define MAX_QUEUE_LENGTH 512
struct handler {
struct udpif *udpif; /* Parent udpif. */
pthread_t thread; /* Thread ID. */
+ char *name; /* Thread name. */
struct ovs_mutex mutex; /* Mutex guarding the following. */
struct list upcalls OVS_GUARDED;
size_t n_upcalls OVS_GUARDED;
- size_t n_new_upcalls; /* Only changed by the dispatcher. */
bool need_signal; /* Only changed by the dispatcher. */
pthread_cond_t wake_cond; /* Wakes 'thread' while holding
* "handler" threads (see struct handler). Other upcalls are queued to the
* main ofproto_dpif. */
struct udpif {
+ struct list list_node; /* In all_udpifs list. */
+
struct dpif *dpif; /* Datapath handle. */
struct dpif_backer *backer; /* Opaque dpif_backer pointer. */
struct guarded_list drop_keys; /* "struct drop key"s. */
struct guarded_list fmbs; /* "struct flow_miss_batch"es. */
- /* Number of times udpif_revalidate() has been called. */
- atomic_uint reval_seq;
-
struct seq *wait_seq;
+ struct seq *reval_seq;
struct latch exit_latch; /* Tells child threads to exit. */
};
static void upcall_destroy(struct upcall *);
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
static void recv_upcalls(struct udpif *);
static void handle_upcalls(struct udpif *, struct list *upcalls);
static void miss_destroy(struct flow_miss *);
static void *udpif_dispatcher(void *);
static void *udpif_upcall_handler(void *);
+static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
+ const char *argv[], void *aux);
struct udpif *
udpif_create(struct dpif_backer *backer, struct dpif *dpif)
{
+ static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
struct udpif *udpif = xzalloc(sizeof *udpif);
+ if (ovsthread_once_start(&once)) {
+ unixctl_command_register("upcall/show", "", 0, 0, upcall_unixctl_show,
+ NULL);
+ ovsthread_once_done(&once);
+ }
+
udpif->dpif = dpif;
udpif->backer = backer;
udpif->secret = random_uint32();
udpif->wait_seq = seq_create();
+ udpif->reval_seq = seq_create();
latch_init(&udpif->exit_latch);
guarded_list_init(&udpif->drop_keys);
guarded_list_init(&udpif->fmbs);
- atomic_init(&udpif->reval_seq, 0);
+ list_push_back(&all_udpifs, &udpif->list_node);
return udpif;
}
struct flow_miss_batch *fmb;
struct drop_key *drop_key;
- udpif_recv_set(udpif, 0, false);
+ udpif_set_threads(udpif, 0);
+ list_remove(&udpif->list_node);
while ((drop_key = drop_key_next(udpif))) {
drop_key_destroy(drop_key);
guarded_list_destroy(&udpif->fmbs);
latch_destroy(&udpif->exit_latch);
seq_destroy(udpif->wait_seq);
+ seq_destroy(udpif->reval_seq);
free(udpif);
}
-/* Tells 'udpif' to begin or stop handling flow misses depending on the value
- * of 'enable'. 'n_handlers' is the number of upcall_handler threads to
- * create. Passing 'n_handlers' as zero is equivalent to passing 'enable' as
- * false. */
+/* Tells 'udpif' how many threads it should use to handle upcalls. Disables
+ * all threads if 'n_handlers' is zero. 'udpif''s datapath handle must have
+ * packet reception enabled before starting threads. */
void
-udpif_recv_set(struct udpif *udpif, size_t n_handlers, bool enable)
+udpif_set_threads(struct udpif *udpif, size_t n_handlers)
{
- n_handlers = enable ? n_handlers : 0;
- n_handlers = MIN(n_handlers, 64);
-
/* Stop the old threads (if any). */
if (udpif->handlers && udpif->n_handlers != n_handlers) {
size_t i;
ovs_mutex_destroy(&handler->mutex);
xpthread_cond_destroy(&handler->wake_cond);
+ free(handler->name);
}
latch_poll(&udpif->exit_latch);
udpif_revalidate(struct udpif *udpif)
{
struct flow_miss_batch *fmb, *next_fmb;
- unsigned int junk;
struct list fmbs;
/* Since we remove each miss on revalidation, their statistics won't be
* accounted to the appropriate 'facet's in the upper layer. In most
* cases, this is alright because we've already pushed the stats to the
- * relevant rules. However, NetFlow requires absolute packet counts on
- * 'facet's which could now be incorrect. */
- atomic_add(&udpif->reval_seq, 1, &junk);
+ * relevant rules. */
+ seq_change(udpif->reval_seq);
guarded_list_pop_all(&udpif->fmbs, &fmbs);
LIST_FOR_EACH_SAFE (fmb, next_fmb, list_node, &fmbs) {
udpif_drop_key_clear(udpif);
}
+void
+udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
+{
+ size_t i;
+
+ simap_increase(usage, "dispatchers", 1);
+ simap_increase(usage, "flow_dumpers", 1);
+
+ simap_increase(usage, "handlers", udpif->n_handlers);
+ for (i = 0; i < udpif->n_handlers; i++) {
+ struct handler *handler = &udpif->handlers[i];
+ ovs_mutex_lock(&handler->mutex);
+ simap_increase(usage, "handler upcalls", handler->n_upcalls);
+ ovs_mutex_unlock(&handler->mutex);
+ }
+}
+
/* Destroys and deallocates 'upcall'. */
static void
upcall_destroy(struct upcall *upcall)
{
if (upcall) {
+ ofpbuf_uninit(&upcall->dpif_upcall.packet);
ofpbuf_uninit(&upcall->upcall_buf);
free(upcall);
}
for (i = 0; i < 50; i++) {
struct flow_miss_batch *next;
- unsigned int reval_seq;
struct list *next_node;
next_node = guarded_list_pop_front(&udpif->fmbs);
}
next = CONTAINER_OF(next_node, struct flow_miss_batch, list_node);
- atomic_read(&udpif->reval_seq, &reval_seq);
- if (next->reval_seq == reval_seq) {
+ if (next->reval_seq == seq_read(udpif->reval_seq)) {
return next;
}
{
struct handler *handler = arg;
- set_subprogram_name("upcall_%u", ovsthread_id_self());
+ handler->name = xasprintf("handler_%u", ovsthread_id_self());
+ set_subprogram_name("%s", handler->name);
+
for (;;) {
struct list misses = LIST_INITIALIZER(&misses);
size_t i;
error = dpif_recv(udpif->dpif, &upcall->dpif_upcall,
&upcall->upcall_buf);
if (error) {
- upcall_destroy(upcall);
+ /* upcall_destroy() can only be called on successfully received
+ * upcalls. */
+ ofpbuf_uninit(&upcall->upcall_buf);
+ free(upcall);
break;
}
struct flow_miss_batch *fmb;
size_t n_misses, n_ops, i;
struct flow_miss *miss;
- unsigned int reval_seq;
enum upcall_type type;
bool fail_open;
* datapath flow.)
*/
fmb = xmalloc(sizeof *fmb);
- atomic_read(&udpif->reval_seq, &fmb->reval_seq);
+ fmb->reval_seq = seq_read(udpif->reval_seq);
hmap_init(&fmb->misses);
list_init(&fmb->upcalls);
n_misses = 0;
LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
struct dpif_upcall *dupcall = &upcall->dpif_upcall;
- struct ofpbuf *packet = dupcall->packet;
+ struct ofpbuf *packet = &dupcall->packet;
struct flow_miss *miss = &fmb->miss_buf[n_misses];
struct flow_miss *existing_miss;
struct ofproto_dpif *ofproto;
error = xlate_receive(udpif->backer, packet, dupcall->key,
dupcall->key_len, &flow, &miss->key_fitness,
- &ofproto, &odp_in_port);
+ &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
if (error) {
if (error == ENODEV) {
struct drop_key *drop_key;
switch (type) {
case SFLOW_UPCALL:
- sflow = xlate_get_sflow(ofproto);
if (sflow) {
union user_action_cookie cookie;
memset(&cookie, 0, sizeof cookie);
memcpy(&cookie, nl_attr_get(dupcall->userdata),
sizeof cookie.sflow);
- dpif_sflow_received(sflow, dupcall->packet, &flow, odp_in_port,
+ dpif_sflow_received(sflow, packet, &flow, odp_in_port,
&cookie);
- dpif_sflow_unref(sflow);
}
break;
case IPFIX_UPCALL:
- ipfix = xlate_get_ipfix(ofproto);
if (ipfix) {
- dpif_ipfix_bridge_sample(ipfix, dupcall->packet, &flow);
- dpif_ipfix_unref(ipfix);
+ dpif_ipfix_bridge_sample(ipfix, packet, &flow);
}
break;
case FLOW_SAMPLE_UPCALL:
- ipfix = xlate_get_ipfix(ofproto);
if (ipfix) {
union user_action_cookie cookie;
/* The flow reflects exactly the contents of the packet.
* Sample the packet using it. */
- dpif_ipfix_flow_sample(ipfix, dupcall->packet, &flow,
+ dpif_ipfix_flow_sample(ipfix, packet, &flow,
cookie.flow_sample.collector_set_id,
cookie.flow_sample.probability,
cookie.flow_sample.obs_domain_id,
cookie.flow_sample.obs_point_id);
- dpif_ipfix_unref(ipfix);
}
break;
case BAD_UPCALL:
break;
case MISS_UPCALL:
- NOT_REACHED();
+ OVS_NOT_REACHED();
}
+ dpif_ipfix_unref(ipfix);
+ dpif_sflow_unref(sflow);
+
list_remove(&upcall->list_node);
upcall_destroy(upcall);
}
n_ops = 0;
LIST_FOR_EACH (upcall, list_node, upcalls) {
struct flow_miss *miss = upcall->flow_miss;
- struct ofpbuf *packet = upcall->dpif_upcall.packet;
+ struct ofpbuf *packet = &upcall->dpif_upcall.packet;
if (miss->xout.slow) {
struct xlate_in xin;
}
}
- /* Execute batch. */
- for (i = 0; i < n_ops; i++) {
- opsp[i] = &ops[i];
- }
- dpif_operate(udpif->dpif, opsp, n_ops);
-
/* Special case for fail-open mode.
*
* If we are in fail-open mode, but we are connected to a controller too,
* then we should send the packet up to the controller in the hope that it
* will try to set up a flow and thereby allow us to exit fail-open.
*
- * See the top-level comment in fail-open.c for more information. */
+ * See the top-level comment in fail-open.c for more information.
+ *
+ * Copy packets before they are modified by execution. */
if (fail_open) {
LIST_FOR_EACH (upcall, list_node, upcalls) {
struct flow_miss *miss = upcall->flow_miss;
- struct ofpbuf *packet = upcall->dpif_upcall.packet;
+ struct ofpbuf *packet = &upcall->dpif_upcall.packet;
struct ofproto_packet_in *pin;
pin = xmalloc(sizeof *pin);
}
}
+ /* Execute batch. */
+ for (i = 0; i < n_ops; i++) {
+ opsp[i] = &ops[i];
+ }
+ dpif_operate(udpif->dpif, opsp, n_ops);
+
list_move(&fmb->upcalls, upcalls);
- atomic_read(&udpif->reval_seq, &reval_seq);
- if (reval_seq != fmb->reval_seq) {
+ if (fmb->reval_seq != seq_read(udpif->reval_seq)) {
COVERAGE_INC(fmb_queue_revalidated);
flow_miss_batch_destroy(fmb);
} else if (!guarded_list_push_back(&udpif->fmbs, &fmb->list_node,
seq_change(udpif->wait_seq);
}
}
+\f
+static void
+upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
+{
+ struct ds ds = DS_EMPTY_INITIALIZER;
+ struct udpif *udpif;
+
+ LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+ size_t i;
+
+ ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
+ for (i = 0; i < udpif->n_handlers; i++) {
+ struct handler *handler = &udpif->handlers[i];
+
+ ovs_mutex_lock(&handler->mutex);
+ ds_put_format(&ds, "\t%s: (upcall queue %"PRIuSIZE")\n",
+ handler->name, handler->n_upcalls);
+ ovs_mutex_unlock(&handler->mutex);
+ }
+ }
+
+ unixctl_command_reply(conn, ds_cstr(&ds));
+ ds_destroy(&ds);
+}