-/* Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
#define MAX_QUEUE_LENGTH 512
#define FLOW_MISS_MAX_BATCH 50
#define REVALIDATE_MAX_BATCH 50
+#define MAX_IDLE 1500
VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
unsigned int avg_n_flows;
/* Following fields are accessed and modified by different threads. */
- atomic_llong max_idle; /* Maximum datapath flow idle time. */
atomic_uint flow_limit; /* Datapath flow hard limit. */
+
+ /* n_flows_mutex prevents multiple threads updating these concurrently. */
+ atomic_uint64_t n_flows; /* Number of flows in the datapath. */
+ atomic_llong n_flows_timestamp; /* Last time n_flows was updated. */
+ struct ovs_mutex n_flows_mutex;
};
enum upcall_type {
struct ofproto_dpif *ofproto;
struct flow flow;
- enum odp_key_fitness key_fitness;
const struct nlattr *key;
size_t key_len;
enum dpif_upcall_type upcall_type;
struct odputil_keybuf mask_buf;
struct xlate_out xout;
+
+ bool put;
};
static void upcall_destroy(struct upcall *);
static void *udpif_dispatcher(void *);
static void *udpif_upcall_handler(void *);
static void *udpif_revalidator(void *);
-static uint64_t udpif_get_n_flows(const struct udpif *);
+static uint64_t udpif_get_n_flows(struct udpif *);
static void revalidate_udumps(struct revalidator *, struct list *udumps);
static void revalidator_sweep(struct revalidator *);
static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
const char *argv[], void *aux);
static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
const char *argv[], void *aux);
+static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
+ const char *argv[], void *aux);
static void ukey_delete(struct revalidator *, struct udpif_key *);
static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
upcall_unixctl_disable_megaflows, NULL);
unixctl_command_register("upcall/enable-megaflows", "", 0, 0,
upcall_unixctl_enable_megaflows, NULL);
+ unixctl_command_register("upcall/set-flow-limit", "", 1, 1,
+ upcall_unixctl_set_flow_limit, NULL);
ovsthread_once_done(&once);
}
udpif->dpif = dpif;
udpif->backer = backer;
- atomic_init(&udpif->max_idle, 5000);
atomic_init(&udpif->flow_limit, MIN(ofproto_flow_limit, 10000));
udpif->secret = random_uint32();
udpif->reval_seq = seq_create();
udpif->dump_seq = seq_create();
latch_init(&udpif->exit_latch);
list_push_back(&all_udpifs, &udpif->list_node);
+ atomic_init(&udpif->n_flows, 0);
+ atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
+ ovs_mutex_init(&udpif->n_flows_mutex);
return udpif;
}
latch_destroy(&udpif->exit_latch);
seq_destroy(udpif->reval_seq);
seq_destroy(udpif->dump_seq);
+ atomic_destroy(&udpif->flow_limit);
+ atomic_destroy(&udpif->n_flows);
+ atomic_destroy(&udpif->n_flows_timestamp);
+ ovs_mutex_destroy(&udpif->n_flows_mutex);
free(udpif);
}
}
static uint64_t
-udpif_get_n_flows(const struct udpif *udpif)
+udpif_get_n_flows(struct udpif *udpif)
{
- struct dpif_dp_stats stats;
-
- dpif_get_dp_stats(udpif->dpif, &stats);
- return stats.n_flows;
+ long long int time, now;
+ uint64_t flow_count;
+
+ now = time_msec();
+ atomic_read(&udpif->n_flows_timestamp, &time);
+ if (time < now - 100 && !ovs_mutex_trylock(&udpif->n_flows_mutex)) {
+ struct dpif_dp_stats stats;
+
+ atomic_store(&udpif->n_flows_timestamp, now);
+ dpif_get_dp_stats(udpif->dpif, &stats);
+ flow_count = stats.n_flows;
+ atomic_store(&udpif->n_flows, flow_count);
+ ovs_mutex_unlock(&udpif->n_flows_mutex);
+ } else {
+ atomic_read(&udpif->n_flows, &flow_count);
+ }
+ return flow_count;
}
/* The dispatcher thread is responsible for receiving upcalls from the kernel,
struct dpif_flow_dump dump;
size_t key_len, mask_len;
unsigned int flow_limit;
- long long int max_idle;
bool need_revalidate;
uint64_t reval_seq;
size_t n_flows, i;
udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
- atomic_read(&udpif->flow_limit, &flow_limit);
- if (n_flows < flow_limit / 8) {
- max_idle = 5000;
- } else if (n_flows < flow_limit / 4) {
- max_idle = 2000;
- } else if (n_flows < flow_limit / 2) {
- max_idle = 1000;
- } else {
- max_idle = 500;
- }
- atomic_store(&udpif->max_idle, max_idle);
-
start_time = time_msec();
dpif_flow_dump_start(&dump, udpif->dpif);
while (dpif_flow_dump_next(&dump, &key, &key_len, &mask, &mask_len,
ovs_mutex_unlock(&revalidator->mutex);
}
- duration = time_msec() - start_time;
+ duration = MAX(time_msec() - start_time, 1);
udpif->dump_duration = duration;
+ atomic_read(&udpif->flow_limit, &flow_limit);
if (duration > 2000) {
flow_limit /= duration / 1000;
} else if (duration > 1300) {
atomic_store(&udpif->flow_limit, flow_limit);
if (duration > 2000) {
- VLOG_WARN("Spent an unreasonably long %lldms dumping flows",
+ VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
duration);
}
- poll_timer_wait_until(start_time + MIN(max_idle, 500));
+ poll_timer_wait_until(start_time + MIN(MAX_IDLE, 500));
seq_wait(udpif->reval_seq, udpif->last_reval_seq);
latch_wait(&udpif->exit_latch);
poll_block();
handler->name = xasprintf("handler_%u", ovsthread_id_self());
set_subprogram_name("%s", handler->name);
- for (;;) {
+ while (!latch_is_set(&handler->udpif->exit_latch)) {
struct list misses = LIST_INITIALIZER(&misses);
size_t i;
ovs_mutex_lock(&handler->mutex);
-
- if (latch_is_set(&handler->udpif->exit_latch)) {
- ovs_mutex_unlock(&handler->mutex);
- return NULL;
- }
-
if (!handler->n_upcalls) {
ovs_mutex_cond_wait(&handler->wake_cond, &handler->mutex);
}
coverage_clear();
}
+
+ return NULL;
}
static void *
int error;
error = xlate_receive(udpif->backer, packet, dupcall->key,
- dupcall->key_len, &flow, &miss->key_fitness,
+ dupcall->key_len, &flow,
&ofproto, &ipfix, &sflow, NULL, &odp_in_port);
if (error) {
if (error == ENODEV) {
miss->stats.used = time_msec();
miss->stats.tcp_flags = 0;
miss->odp_in_port = odp_in_port;
+ miss->put = false;
n_misses++;
} else {
LIST_FOR_EACH (upcall, list_node, upcalls) {
struct flow_miss *miss = upcall->flow_miss;
struct ofpbuf *packet = &upcall->dpif_upcall.packet;
- struct ofpbuf mask;
struct dpif_op *op;
- bool megaflow;
+ ovs_be16 flow_vlan_tci;
+
+ /* Save a copy of flow.vlan_tci in case it is changed to
+ * generate proper mega flow masks for VLAN splinter flows. */
+ flow_vlan_tci = miss->flow.vlan_tci;
if (miss->xout.slow) {
struct xlate_in xin;
xlate_actions_for_side_effects(&xin);
}
- atomic_read(&enable_megaflows, &megaflow);
- ofpbuf_use_stack(&mask, &miss->mask_buf, sizeof miss->mask_buf);
- if (megaflow) {
- odp_flow_key_from_mask(&mask, &miss->xout.wc.masks, &miss->flow,
- UINT32_MAX);
+ if (miss->flow.in_port.ofp_port
+ != vsp_realdev_to_vlandev(miss->ofproto,
+ miss->flow.in_port.ofp_port,
+ miss->flow.vlan_tci)) {
+ /* This packet was received on a VLAN splinter port. We
+ * added a VLAN to the packet to make the packet resemble
+ * the flow, but the actions were composed assuming that
+ * the packet contained no VLAN. So, we must remove the
+ * VLAN header from the packet before trying to execute the
+ * actions. */
+ if (miss->xout.odp_actions.size) {
+ eth_pop_vlan(packet);
+ }
+
+ /* Remove the flow vlan tags inserted by vlan splinter logic
+ * to ensure megaflow masks generated match the data path flow. */
+ miss->flow.vlan_tci = 0;
}
- if (may_put) {
+ /* Do not install a flow into the datapath if:
+ *
+ * - The datapath already has too many flows.
+ *
+ * - An earlier iteration of this loop already put the same flow.
+ *
+ * - We received this packet via some flow installed in the kernel
+ * already. */
+ if (may_put
+ && !miss->put
+ && upcall->dpif_upcall.type == DPIF_UC_MISS) {
+ struct ofpbuf mask;
+ bool megaflow;
+
+ miss->put = true;
+
+ atomic_read(&enable_megaflows, &megaflow);
+ ofpbuf_use_stack(&mask, &miss->mask_buf, sizeof miss->mask_buf);
+ if (megaflow) {
+ size_t max_mpls;
+
+ max_mpls = ofproto_dpif_get_max_mpls_depth(miss->ofproto);
+ odp_flow_key_from_mask(&mask, &miss->xout.wc.masks,
+ &miss->flow, UINT32_MAX, max_mpls);
+ }
+
op = &ops[n_ops++];
op->type = DPIF_OP_FLOW_PUT;
op->u.flow_put.flags = DPIF_FP_CREATE | DPIF_FP_MODIFY;
}
}
+ /*
+ * The 'miss' may be shared by multiple upcalls. Restore
+ * the saved flow vlan_tci field before processing the next
+ * upcall. */
+ miss->flow.vlan_tci = flow_vlan_tci;
+
if (miss->xout.odp_actions.size) {
- if (miss->flow.in_port.ofp_port
- != vsp_realdev_to_vlandev(miss->ofproto,
- miss->flow.in_port.ofp_port,
- miss->flow.vlan_tci)) {
- /* This packet was received on a VLAN splinter port. We
- * added a VLAN to the packet to make the packet resemble
- * the flow, but the actions were composed assuming that
- * the packet contained no VLAN. So, we must remove the
- * VLAN header from the packet before trying to execute the
- * actions. */
- eth_pop_vlan(packet);
- }
op = &ops[n_ops++];
op->type = DPIF_OP_EXECUTE;
- op->u.execute.key = miss->key;
- op->u.execute.key_len = miss->key_len;
op->u.execute.packet = packet;
+ odp_key_to_pkt_metadata(miss->key, miss->key_len,
+ &op->u.execute.md);
op->u.execute.actions = miss->xout.odp_actions.data;
op->u.execute.actions_len = miss->xout.odp_actions.size;
op->u.execute.needs_help = (miss->xout.slow & SLOW_ACTION) != 0;
}
error = xlate_receive(udpif->backer, NULL, ukey->key, ukey->key_len, &flow,
- NULL, &ofproto, NULL, NULL, NULL, &odp_in_port);
+ &ofproto, NULL, NULL, NULL, &odp_in_port);
if (error) {
goto exit;
}
long long int max_idle;
bool must_del;
- atomic_read(&udpif->max_idle, &max_idle);
atomic_read(&udpif->flow_limit, &flow_limit);
n_flows = udpif_get_n_flows(udpif);
must_del = false;
+ max_idle = MAX_IDLE;
if (n_flows > flow_limit) {
must_del = n_flows > 2 * flow_limit;
max_idle = 100;
struct flow flow;
if (!xlate_receive(udpif->backer, NULL, ops[i].op.u.flow_del.key,
- ops[i].op.u.flow_del.key_len, &flow, NULL,
+ ops[i].op.u.flow_del.key_len, &flow,
&ofproto, NULL, NULL, &netflow, NULL)) {
struct xlate_in xin;
LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
unsigned int flow_limit;
- long long int max_idle;
size_t i;
atomic_read(&udpif->flow_limit, &flow_limit);
- atomic_read(&udpif->max_idle, &max_idle);
ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
ds_put_format(&ds, "\tflows : (current %"PRIu64")"
" (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
- ds_put_format(&ds, "\tmax idle : %lldms\n", max_idle);
ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
ds_put_char(&ds, '\n');
udpif_flush();
unixctl_command_reply(conn, "megaflows enabled");
}
+
+/* Set the flow limit.
+ *
+ * This command is only needed for advanced debugging, so it's not
+ * documented in the man page. */
+static void
+upcall_unixctl_set_flow_limit(struct unixctl_conn *conn,
+ int argc OVS_UNUSED,
+ const char *argv[] OVS_UNUSED,
+ void *aux OVS_UNUSED)
+{
+ struct ds ds = DS_EMPTY_INITIALIZER;
+ struct udpif *udpif;
+ unsigned int flow_limit = atoi(argv[1]);
+
+ LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
+ atomic_store(&udpif->flow_limit, flow_limit);
+ }
+ ds_put_format(&ds, "set flow_limit to %u\n", flow_limit);
+ unixctl_command_reply(conn, ds_cstr(&ds));
+ ds_destroy(&ds);
+}