X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=ofproto%2Fofproto-dpif-upcall.c;h=717563a3fe68599116793a8343e82e324eb058e5;hb=6f12bda359fb13fb2c0d6f958f56956bb76e47d7;hp=4ee5bf59c1306b7b25282f1ae0c98eaf3c68e290;hpb=6a279b0738fe7e2603a77cf56139f9b06b853e5b;p=sliver-openvswitch.git diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c index 4ee5bf59c..717563a3f 100644 --- a/ofproto/ofproto-dpif-upcall.c +++ b/ofproto/ofproto-dpif-upcall.c @@ -45,6 +45,8 @@ VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall); +COVERAGE_DEFINE(upcall_duplicate_flow); + /* A thread that reads upcalls from dpif, forwards each upcall's packet, * and possibly sets up a kernel flow as a cache. */ struct handler { @@ -54,22 +56,15 @@ struct handler { uint32_t handler_id; /* Handler id. */ }; -/* A thread that processes each kernel flow handed to it by the flow_dumper - * thread, updates OpenFlow statistics, and updates or removes the kernel flow - * as necessary. */ +/* A thread that processes datapath flows, updates OpenFlow statistics, and + * updates or removes them if necessary. */ struct revalidator { struct udpif *udpif; /* Parent udpif. */ char *name; /* Thread name. */ pthread_t thread; /* Thread ID. */ - struct hmap ukeys; /* Datapath flow keys. */ - - uint64_t dump_seq; - - struct ovs_mutex mutex; /* Mutex guarding the following. */ - pthread_cond_t wake_cond; - struct list udumps OVS_GUARDED; /* Unprocessed udumps. */ - size_t n_udumps OVS_GUARDED; /* Number of unprocessed udumps. */ + struct hmap *ukeys; /* Points into udpif->ukeys for this + revalidator. Used for GC phase. */ }; /* An upcall handler for ofproto_dpif. @@ -85,13 +80,9 @@ struct revalidator { * flow revalidation * ----------------- * - * - An array of 'struct revalidator's for flow revalidation and - * stats collection. - * - * - A "flow_dumper" thread that reads the kernel flow table and dispatches - * flows to one of several "revalidator" threads (see struct - * revalidator). - * */ + * - Revalidation threads which read the datapath flow table and maintains + * them. + */ struct udpif { struct list list_node; /* In all_udpifs list. */ @@ -100,22 +91,33 @@ struct udpif { uint32_t secret; /* Random seed for upcall hash. */ - pthread_t flow_dumper; /* Flow dumper thread ID. */ - struct handler *handlers; /* Upcall handlers. */ size_t n_handlers; struct revalidator *revalidators; /* Flow revalidators. */ size_t n_revalidators; - uint64_t last_reval_seq; /* 'reval_seq' at last revalidation. */ - struct seq *reval_seq; /* Incremented to force revalidation. */ - - struct seq *dump_seq; /* Increments each dump iteration. */ - struct latch exit_latch; /* Tells child threads to exit. */ + /* Revalidation. */ + struct seq *reval_seq; /* Incremented to force revalidation. */ + bool need_revalidate; /* As indicated by 'reval_seq'. */ + bool reval_exit; /* Set by leader on 'exit_latch. */ + pthread_barrier_t reval_barrier; /* Barrier used by revalidators. */ + struct dpif_flow_dump dump; /* DPIF flow dump state. */ long long int dump_duration; /* Duration of the last flow dump. */ + struct seq *dump_seq; /* Increments each dump iteration. */ + + /* There are 'n_revalidators' ukey hmaps. Each revalidator retains a + * reference to one of these for garbage collection. + * + * During the flow dump phase, revalidators insert into these with a random + * distribution. During the garbage collection phase, each revalidator + * takes care of garbage collecting one of these hmaps. */ + struct { + struct ovs_mutex mutex; /* Guards the following. */ + struct hmap hmap OVS_GUARDED; /* Datapath flow keys. */ + } *ukeys; /* Datapath flow statistics. */ unsigned int max_n_flows; @@ -149,44 +151,33 @@ struct upcall { /* 'udpif_key's are responsible for tracking the little bit of state udpif * needs to do flow expiration which can't be pulled directly from the - * datapath. They are owned, created by, maintained, and destroyed by a single - * revalidator making them easy to efficiently handle with multiple threads. */ + * datapath. They may be created or maintained by any revalidator during + * the dump phase, but are owned by a single revalidator, and are destroyed + * by that revalidator during the garbage-collection phase. + * + * While some elements of a udpif_key are protected by a mutex, the ukey itself + * is not. Therefore it is not safe to destroy a udpif_key except when all + * revalidators are in garbage collection phase, or they aren't running. */ struct udpif_key { struct hmap_node hmap_node; /* In parent revalidator 'ukeys' map. */ - struct nlattr *key; /* Datapath flow key. */ - size_t key_len; /* Length of 'key'. */ - - struct dpif_flow_stats stats; /* Stats at most recent flow dump. */ - long long int created; /* Estimation of creation time. */ - - bool mark; /* Used by mark and sweep GC algorithm. */ - - struct odputil_keybuf key_buf; /* Memory for 'key'. */ - struct xlate_cache *xcache; /* Cache for xlate entries that - * are affected by this ukey. - * Used for stats and learning.*/ -}; - -/* 'udpif_flow_dump's hold the state associated with one iteration in a flow - * dump operation. This is created by the flow_dumper thread and handed to the - * appropriate revalidator thread to be processed. */ -struct udpif_flow_dump { - struct list list_node; - - struct nlattr *key; /* Datapath flow key. */ + /* These elements are read only once created, and therefore aren't + * protected by a mutex. */ + const struct nlattr *key; /* Datapath flow key. */ size_t key_len; /* Length of 'key'. */ - uint32_t key_hash; /* Hash of 'key'. */ - - struct odputil_keybuf mask_buf; - struct nlattr *mask; /* Datapath mask for 'key'. */ - size_t mask_len; /* Length of 'mask'. */ - - struct dpif_flow_stats stats; /* Stats pulled from the datapath. */ - bool need_revalidate; /* Key needs revalidation? */ - - struct odputil_keybuf key_buf; + struct ovs_mutex mutex; /* Guards the following. */ + struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/ + long long int created OVS_GUARDED; /* Estimate of creation time. */ + bool mark OVS_GUARDED; /* For mark and sweep garbage + collection. */ + bool flow_exists OVS_GUARDED; /* Ensures flows are only deleted + once. */ + + struct xlate_cache *xcache OVS_GUARDED; /* Cache for xlate entries that + * are affected by this ukey. + * Used for stats and learning.*/ + struct odputil_keybuf key_buf; /* Memory for 'key'. */ }; /* Flow miss batching. @@ -223,11 +214,13 @@ static size_t read_upcalls(struct handler *, struct hmap *); static void handle_upcalls(struct handler *, struct hmap *, struct upcall *, size_t n_upcalls); -static void *udpif_flow_dumper(void *); +static void udpif_stop_threads(struct udpif *); +static void udpif_start_threads(struct udpif *, size_t n_handlers, + size_t n_revalidators); static void *udpif_upcall_handler(void *); static void *udpif_revalidator(void *); static uint64_t udpif_get_n_flows(struct udpif *); -static void revalidate_udumps(struct revalidator *, struct list *udumps); +static void revalidate(struct revalidator *); static void revalidator_sweep(struct revalidator *); static void revalidator_purge(struct revalidator *); static void upcall_unixctl_show(struct unixctl_conn *conn, int argc, @@ -238,6 +231,9 @@ static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc, const char *argv[], void *aux); static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc, const char *argv[], void *aux); + +static struct udpif_key *ukey_create(const struct nlattr *key, size_t key_len, + long long int used); static void ukey_delete(struct revalidator *, struct udpif_key *); static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true); @@ -278,8 +274,7 @@ udpif_create(struct dpif_backer *backer, struct dpif *dpif) void udpif_destroy(struct udpif *udpif) { - udpif_set_threads(udpif, 0, 0); - udpif_flush(udpif); + udpif_stop_threads(udpif); list_remove(&udpif->list_node); latch_destroy(&udpif->exit_latch); @@ -289,21 +284,12 @@ udpif_destroy(struct udpif *udpif) free(udpif); } -/* Tells 'udpif' how many threads it should use to handle upcalls. Disables - * all threads if 'n_handlers' and 'n_revalidators' is zero. 'udpif''s - * datapath handle must have packet reception enabled before starting threads. - */ -void -udpif_set_threads(struct udpif *udpif, size_t n_handlers, - size_t n_revalidators) +/* Stops the handler and revalidator threads, must be enclosed in + * ovsrcu quiescent state unless when destroying udpif. */ +static void +udpif_stop_threads(struct udpif *udpif) { - int error; - - ovsrcu_quiesce_start(); - /* Stop the old threads (if any). */ - if (udpif->handlers && - (udpif->n_handlers != n_handlers - || udpif->n_revalidators != n_revalidators)) { + if (udpif && (udpif->n_handlers != 0 || udpif->n_revalidators != 0)) { size_t i; latch_set(&udpif->exit_latch); @@ -315,33 +301,19 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers, } for (i = 0; i < udpif->n_revalidators; i++) { - struct revalidator *revalidator = &udpif->revalidators[i]; - - ovs_mutex_lock(&revalidator->mutex); - xpthread_cond_signal(&revalidator->wake_cond); - ovs_mutex_unlock(&revalidator->mutex); - xpthread_join(revalidator->thread, NULL); + xpthread_join(udpif->revalidators[i].thread, NULL); } - xpthread_join(udpif->flow_dumper, NULL); - for (i = 0; i < udpif->n_revalidators; i++) { struct revalidator *revalidator = &udpif->revalidators[i]; - struct udpif_flow_dump *udump, *next_udump; - - LIST_FOR_EACH_SAFE (udump, next_udump, list_node, - &revalidator->udumps) { - list_remove(&udump->list_node); - free(udump); - } /* Delete ukeys, and delete all flows from the datapath to prevent * double-counting stats. */ revalidator_purge(revalidator); - hmap_destroy(&revalidator->ukeys); - ovs_mutex_destroy(&revalidator->mutex); - free(revalidator->name); + + hmap_destroy(&udpif->ukeys[i].hmap); + ovs_mutex_destroy(&udpif->ukeys[i].mutex); } for (i = 0; i < udpif->n_handlers; i++) { @@ -349,6 +321,8 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers, } latch_poll(&udpif->exit_latch); + xpthread_barrier_destroy(&udpif->reval_barrier); + free(udpif->revalidators); udpif->revalidators = NULL; udpif->n_revalidators = 0; @@ -356,17 +330,19 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers, free(udpif->handlers); udpif->handlers = NULL; udpif->n_handlers = 0; - } - error = dpif_handlers_set(udpif->dpif, n_handlers); - if (error) { - VLOG_ERR("failed to configure handlers in dpif %s: %s", - dpif_name(udpif->dpif), ovs_strerror(error)); - return; + free(udpif->ukeys); + udpif->ukeys = NULL; } +} - /* Start new threads (if necessary). */ - if (!udpif->handlers && n_handlers) { +/* Starts the handler and revalidator threads, must be enclosed in + * ovsrcu quiescent state. */ +static void +udpif_start_threads(struct udpif *udpif, size_t n_handlers, + size_t n_revalidators) +{ + if (udpif && n_handlers && n_revalidators) { size_t i; udpif->n_handlers = n_handlers; @@ -382,22 +358,54 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers, handler); } + xpthread_barrier_init(&udpif->reval_barrier, NULL, + udpif->n_revalidators); + udpif->reval_exit = false; udpif->revalidators = xzalloc(udpif->n_revalidators * sizeof *udpif->revalidators); + udpif->ukeys = xmalloc(sizeof *udpif->ukeys * n_revalidators); for (i = 0; i < udpif->n_revalidators; i++) { struct revalidator *revalidator = &udpif->revalidators[i]; revalidator->udpif = udpif; - list_init(&revalidator->udumps); - hmap_init(&revalidator->ukeys); - ovs_mutex_init(&revalidator->mutex); - xpthread_cond_init(&revalidator->wake_cond, NULL); + hmap_init(&udpif->ukeys[i].hmap); + ovs_mutex_init(&udpif->ukeys[i].mutex); + revalidator->ukeys = &udpif->ukeys[i].hmap; xpthread_create(&revalidator->thread, NULL, udpif_revalidator, revalidator); } - xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper, udpif); + } +} + +/* Tells 'udpif' how many threads it should use to handle upcalls. + * 'n_handlers' and 'n_revalidators' can never be zero. 'udpif''s + * datapath handle must have packet reception enabled before starting + * threads. */ +void +udpif_set_threads(struct udpif *udpif, size_t n_handlers, + size_t n_revalidators) +{ + int error; + + ovs_assert(udpif); + ovs_assert(n_handlers && n_revalidators); + + ovsrcu_quiesce_start(); + if (udpif->n_handlers != n_handlers + || udpif->n_revalidators != n_revalidators) { + udpif_stop_threads(udpif); } + error = dpif_handlers_set(udpif->dpif, n_handlers); + if (error) { + VLOG_ERR("failed to configure handlers in dpif %s: %s", + dpif_name(udpif->dpif), ovs_strerror(error)); + return; + } + + if (!udpif->handlers && !udpif->revalidators) { + udpif_start_threads(udpif, n_handlers, n_revalidators); + } ovsrcu_quiesce_end(); } @@ -413,8 +421,11 @@ udpif_synchronize(struct udpif *udpif) * its main loop once. */ size_t n_handlers = udpif->n_handlers; size_t n_revalidators = udpif->n_revalidators; - udpif_set_threads(udpif, 0, 0); - udpif_set_threads(udpif, n_handlers, n_revalidators); + + ovsrcu_quiesce_start(); + udpif_stop_threads(udpif); + udpif_start_threads(udpif, n_handlers, n_revalidators); + ovsrcu_quiesce_end(); } /* Notifies 'udpif' that something changed which may render previous @@ -439,21 +450,13 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap *usage) { size_t i; - simap_increase(usage, "flow_dumpers", 1); - simap_increase(usage, "handlers", udpif->n_handlers); simap_increase(usage, "revalidators", udpif->n_revalidators); for (i = 0; i < udpif->n_revalidators; i++) { - struct revalidator *revalidator = &udpif->revalidators[i]; - ovs_mutex_lock(&revalidator->mutex); - simap_increase(usage, "revalidator dumps", revalidator->n_udumps); - - /* XXX: This isn't technically thread safe because the revalidator - * ukeys maps isn't protected by a mutex since it's per thread. */ - simap_increase(usage, "revalidator keys", - hmap_count(&revalidator->ukeys)); - ovs_mutex_unlock(&revalidator->mutex); + ovs_mutex_lock(&udpif->ukeys[i].mutex); + simap_increase(usage, "udpif keys", hmap_count(&udpif->ukeys[i].hmap)); + ovs_mutex_unlock(&udpif->ukeys[i].mutex); } } @@ -466,9 +469,13 @@ udpif_flush(struct udpif *udpif) n_handlers = udpif->n_handlers; n_revalidators = udpif->n_revalidators; - udpif_set_threads(udpif, 0, 0); + ovsrcu_quiesce_start(); + + udpif_stop_threads(udpif); dpif_flow_flush(udpif->dpif); - udpif_set_threads(udpif, n_handlers, n_revalidators); + udpif_start_threads(udpif, n_handlers, n_revalidators); + + ovsrcu_quiesce_end(); } /* Removes all flows from all datapaths. */ @@ -505,125 +512,6 @@ udpif_get_n_flows(struct udpif *udpif) return flow_count; } -static void * -udpif_flow_dumper(void *arg) -{ - struct udpif *udpif = arg; - - set_subprogram_name("flow_dumper"); - while (!latch_is_set(&udpif->exit_latch)) { - const struct dpif_flow_stats *stats; - long long int start_time, duration; - const struct nlattr *key, *mask; - struct dpif_flow_dump dump; - size_t key_len, mask_len; - unsigned int flow_limit; - bool need_revalidate; - uint64_t reval_seq; - size_t n_flows, i; - int error; - void *state = NULL; - - reval_seq = seq_read(udpif->reval_seq); - need_revalidate = udpif->last_reval_seq != reval_seq; - udpif->last_reval_seq = reval_seq; - - n_flows = udpif_get_n_flows(udpif); - udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows); - udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2; - - start_time = time_msec(); - error = dpif_flow_dump_start(&dump, udpif->dpif); - if (error) { - VLOG_INFO("Failed to start flow dump (%s)", ovs_strerror(error)); - goto skip; - } - dpif_flow_dump_state_init(udpif->dpif, &state); - while (dpif_flow_dump_next(&dump, state, &key, &key_len, - &mask, &mask_len, NULL, NULL, &stats) - && !latch_is_set(&udpif->exit_latch)) { - struct udpif_flow_dump *udump = xmalloc(sizeof *udump); - struct revalidator *revalidator; - - udump->key_hash = hash_bytes(key, key_len, udpif->secret); - memcpy(&udump->key_buf, key, key_len); - udump->key = (struct nlattr *) &udump->key_buf; - udump->key_len = key_len; - - memcpy(&udump->mask_buf, mask, mask_len); - udump->mask = (struct nlattr *) &udump->mask_buf; - udump->mask_len = mask_len; - - udump->stats = *stats; - udump->need_revalidate = need_revalidate; - - revalidator = &udpif->revalidators[udump->key_hash - % udpif->n_revalidators]; - - ovs_mutex_lock(&revalidator->mutex); - while (revalidator->n_udumps >= REVALIDATE_MAX_BATCH * 3 - && !latch_is_set(&udpif->exit_latch)) { - ovs_mutex_cond_wait(&revalidator->wake_cond, - &revalidator->mutex); - } - list_push_back(&revalidator->udumps, &udump->list_node); - revalidator->n_udumps++; - xpthread_cond_signal(&revalidator->wake_cond); - ovs_mutex_unlock(&revalidator->mutex); - } - dpif_flow_dump_state_uninit(udpif->dpif, state); - dpif_flow_dump_done(&dump); - - /* Let all the revalidators finish and garbage collect. */ - seq_change(udpif->dump_seq); - for (i = 0; i < udpif->n_revalidators; i++) { - struct revalidator *revalidator = &udpif->revalidators[i]; - ovs_mutex_lock(&revalidator->mutex); - xpthread_cond_signal(&revalidator->wake_cond); - ovs_mutex_unlock(&revalidator->mutex); - } - - for (i = 0; i < udpif->n_revalidators; i++) { - struct revalidator *revalidator = &udpif->revalidators[i]; - - ovs_mutex_lock(&revalidator->mutex); - while (revalidator->dump_seq != seq_read(udpif->dump_seq) - && !latch_is_set(&udpif->exit_latch)) { - ovs_mutex_cond_wait(&revalidator->wake_cond, - &revalidator->mutex); - } - ovs_mutex_unlock(&revalidator->mutex); - } - - duration = MAX(time_msec() - start_time, 1); - udpif->dump_duration = duration; - atomic_read(&udpif->flow_limit, &flow_limit); - if (duration > 2000) { - flow_limit /= duration / 1000; - } else if (duration > 1300) { - flow_limit = flow_limit * 3 / 4; - } else if (duration < 1000 && n_flows > 2000 - && flow_limit < n_flows * 1000 / duration) { - flow_limit += 1000; - } - flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000)); - atomic_store(&udpif->flow_limit, flow_limit); - - if (duration > 2000) { - VLOG_INFO("Spent an unreasonably long %lldms dumping flows", - duration); - } - -skip: - poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500)); - seq_wait(udpif->reval_seq, udpif->last_reval_seq); - latch_wait(&udpif->exit_latch); - poll_block(); - } - - return NULL; -} - /* The upcall handler thread tries to read a batch of FLOW_MISS_MAX_BATCH * upcalls from dpif, processes the batch and installs corresponding flows * in dpif. */ @@ -670,42 +558,85 @@ udpif_upcall_handler(void *arg) static void * udpif_revalidator(void *arg) { + /* Used by all revalidators. */ struct revalidator *revalidator = arg; + struct udpif *udpif = revalidator->udpif; + bool leader = revalidator == &udpif->revalidators[0]; + + /* Used only by the leader. */ + long long int start_time = 0; + uint64_t last_reval_seq = 0; + unsigned int flow_limit = 0; + size_t n_flows = 0; revalidator->name = xasprintf("revalidator_%u", ovsthread_id_self()); set_subprogram_name("%s", revalidator->name); for (;;) { - struct list udumps = LIST_INITIALIZER(&udumps); - struct udpif *udpif = revalidator->udpif; - size_t i; + if (leader) { + uint64_t reval_seq; - ovs_mutex_lock(&revalidator->mutex); - if (latch_is_set(&udpif->exit_latch)) { - ovs_mutex_unlock(&revalidator->mutex); - return NULL; - } + reval_seq = seq_read(udpif->reval_seq); + udpif->need_revalidate = last_reval_seq != reval_seq; + last_reval_seq = reval_seq; - if (!revalidator->n_udumps) { - if (revalidator->dump_seq != seq_read(udpif->dump_seq)) { - revalidator->dump_seq = seq_read(udpif->dump_seq); - revalidator_sweep(revalidator); - } else { - ovs_mutex_cond_wait(&revalidator->wake_cond, - &revalidator->mutex); + n_flows = udpif_get_n_flows(udpif); + udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows); + udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2; + + /* Only the leader checks the exit latch to prevent a race where + * some threads think it's true and exit and others think it's + * false and block indefinitely on the reval_barrier */ + udpif->reval_exit = latch_is_set(&udpif->exit_latch); + + start_time = time_msec(); + if (!udpif->reval_exit) { + dpif_flow_dump_start(&udpif->dump, udpif->dpif); } } - for (i = 0; i < REVALIDATE_MAX_BATCH && revalidator->n_udumps; i++) { - list_push_back(&udumps, list_pop_front(&revalidator->udumps)); - revalidator->n_udumps--; + /* Wait for the leader to start the flow dump. */ + xpthread_barrier_wait(&udpif->reval_barrier); + if (udpif->reval_exit) { + break; } + revalidate(revalidator); + + /* Wait for all flows to have been dumped before we garbage collect. */ + xpthread_barrier_wait(&udpif->reval_barrier); + revalidator_sweep(revalidator); + + /* Wait for all revalidators to finish garbage collection. */ + xpthread_barrier_wait(&udpif->reval_barrier); + + if (leader) { + long long int duration; + + dpif_flow_dump_done(&udpif->dump); + seq_change(udpif->dump_seq); + + duration = MAX(time_msec() - start_time, 1); + atomic_read(&udpif->flow_limit, &flow_limit); + udpif->dump_duration = duration; + if (duration > 2000) { + flow_limit /= duration / 1000; + } else if (duration > 1300) { + flow_limit = flow_limit * 3 / 4; + } else if (duration < 1000 && n_flows > 2000 + && flow_limit < n_flows * 1000 / duration) { + flow_limit += 1000; + } + flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000)); + atomic_store(&udpif->flow_limit, flow_limit); - /* Wake up the flow dumper. */ - xpthread_cond_signal(&revalidator->wake_cond); - ovs_mutex_unlock(&revalidator->mutex); + if (duration > 2000) { + VLOG_INFO("Spent an unreasonably long %lldms dumping flows", + duration); + } - if (!list_is_empty(&udumps)) { - revalidate_udumps(revalidator, &udumps); + poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500)); + seq_wait(udpif->reval_seq, last_reval_seq); + latch_wait(&udpif->exit_latch); + poll_block(); } } @@ -1166,43 +1097,90 @@ handle_upcalls(struct handler *handler, struct hmap *misses, dpif_operate(udpif->dpif, opsp, n_ops); } +/* Must be called with udpif->ukeys[hash % udpif->n_revalidators].mutex. */ static struct udpif_key * -ukey_lookup(struct revalidator *revalidator, struct udpif_flow_dump *udump) +ukey_lookup__(struct udpif *udpif, const struct nlattr *key, size_t key_len, + uint32_t hash) { struct udpif_key *ukey; + struct hmap *hmap = &udpif->ukeys[hash % udpif->n_revalidators].hmap; - HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, udump->key_hash, - &revalidator->ukeys) { - if (ukey->key_len == udump->key_len - && !memcmp(ukey->key, udump->key, udump->key_len)) { + HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, hash, hmap) { + if (ukey->key_len == key_len && !memcmp(ukey->key, key, key_len)) { return ukey; } } return NULL; } +static struct udpif_key * +ukey_lookup(struct udpif *udpif, const struct nlattr *key, size_t key_len, + uint32_t hash) +{ + struct udpif_key *ukey; + uint32_t idx = hash % udpif->n_revalidators; + + ovs_mutex_lock(&udpif->ukeys[idx].mutex); + ukey = ukey_lookup__(udpif, key, key_len, hash); + ovs_mutex_unlock(&udpif->ukeys[idx].mutex); + + return ukey; +} + static struct udpif_key * ukey_create(const struct nlattr *key, size_t key_len, long long int used) { struct udpif_key *ukey = xmalloc(sizeof *ukey); + ovs_mutex_init(&ukey->mutex); ukey->key = (struct nlattr *) &ukey->key_buf; memcpy(&ukey->key_buf, key, key_len); ukey->key_len = key_len; + ovs_mutex_lock(&ukey->mutex); ukey->mark = false; + ukey->flow_exists = true; ukey->created = used ? used : time_msec(); memset(&ukey->stats, 0, sizeof ukey->stats); ukey->xcache = NULL; + ovs_mutex_unlock(&ukey->mutex); return ukey; } +/* Checks for a ukey in 'udpif->ukeys' with the same 'ukey->key' and 'hash', + * and inserts 'ukey' if it does not exist. + * + * Returns true if 'ukey' was inserted into 'udpif->ukeys', false otherwise. */ +static bool +udpif_insert_ukey(struct udpif *udpif, struct udpif_key *ukey, uint32_t hash) +{ + struct udpif_key *duplicate; + uint32_t idx = hash % udpif->n_revalidators; + bool ok; + + ovs_mutex_lock(&udpif->ukeys[idx].mutex); + duplicate = ukey_lookup__(udpif, ukey->key, ukey->key_len, hash); + if (duplicate) { + ok = false; + } else { + hmap_insert(&udpif->ukeys[idx].hmap, &ukey->hmap_node, hash); + ok = true; + } + ovs_mutex_unlock(&udpif->ukeys[idx].mutex); + + return ok; +} + static void ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey) + OVS_NO_THREAD_SAFETY_ANALYSIS { - hmap_remove(&revalidator->ukeys, &ukey->hmap_node); + if (revalidator) { + hmap_remove(revalidator->ukeys, &ukey->hmap_node); + } xlate_cache_delete(ukey->xcache); + ovs_mutex_destroy(&ukey->mutex); free(ukey); } @@ -1233,17 +1211,19 @@ should_revalidate(uint64_t packets, long long int used) } static bool -revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump, - struct udpif_key *ukey) +revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey, + const struct nlattr *mask, size_t mask_len, + const struct nlattr *actions, size_t actions_len, + const struct dpif_flow_stats *stats) { - struct ofpbuf xout_actions, *actions; uint64_t slow_path_buf[128 / 8]; struct xlate_out xout, *xoutp; struct netflow *netflow; - struct flow flow, udump_mask; struct ofproto_dpif *ofproto; struct dpif_flow_stats push; - uint32_t *udump32, *xout32; + struct ofpbuf xout_actions; + struct flow flow, dp_mask; + uint32_t *dp32, *xout32; odp_port_t odp_in_port; struct xlate_in xin; long long int last_used; @@ -1253,43 +1233,38 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump, ok = false; xoutp = NULL; - actions = NULL; netflow = NULL; - /* If we don't need to revalidate, we can simply push the stats contained - * in the udump, otherwise we'll have to get the actions so we can check - * them. */ - if (udump->need_revalidate) { - if (dpif_flow_get(udpif->dpif, ukey->key, ukey->key_len, &actions, - &udump->stats)) { - goto exit; - } - } - + ovs_mutex_lock(&ukey->mutex); last_used = ukey->stats.used; - push.used = udump->stats.used; - push.tcp_flags = udump->stats.tcp_flags; - push.n_packets = udump->stats.n_packets > ukey->stats.n_packets - ? udump->stats.n_packets - ukey->stats.n_packets + push.used = stats->used; + push.tcp_flags = stats->tcp_flags; + push.n_packets = stats->n_packets > ukey->stats.n_packets + ? stats->n_packets - ukey->stats.n_packets : 0; - push.n_bytes = udump->stats.n_bytes > ukey->stats.n_bytes - ? udump->stats.n_bytes - ukey->stats.n_bytes + push.n_bytes = stats->n_bytes > ukey->stats.n_bytes + ? stats->n_bytes - ukey->stats.n_bytes : 0; - ukey->stats = udump->stats; + ukey->stats = *stats; - if (udump->need_revalidate && last_used + if (!ukey->flow_exists) { + /* Don't bother revalidating if the flow was already deleted. */ + goto exit; + } + + if (udpif->need_revalidate && last_used && !should_revalidate(push.n_packets, last_used)) { ok = false; goto exit; } - if (!push.n_packets && !udump->need_revalidate) { + if (!push.n_packets && !udpif->need_revalidate) { ok = true; goto exit; } may_learn = push.n_packets > 0; - if (ukey->xcache && !udump->need_revalidate) { + if (ukey->xcache && !udpif->need_revalidate) { xlate_push_stats(ukey->xcache, may_learn, &push); ok = true; goto exit; @@ -1301,7 +1276,7 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump, goto exit; } - if (udump->need_revalidate) { + if (udpif->need_revalidate) { xlate_cache_clear(ukey->xcache); } if (!ukey->xcache) { @@ -1312,11 +1287,11 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump, xin.resubmit_stats = push.n_packets ? &push : NULL; xin.xcache = ukey->xcache; xin.may_learn = may_learn; - xin.skip_wildcards = !udump->need_revalidate; + xin.skip_wildcards = !udpif->need_revalidate; xlate_actions(&xin, &xout); xoutp = &xout; - if (!udump->need_revalidate) { + if (!udpif->need_revalidate) { ok = true; goto exit; } @@ -1329,11 +1304,12 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump, compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions); } - if (!ofpbuf_equal(&xout_actions, actions)) { + if (actions_len != ofpbuf_size(&xout_actions) + || memcmp(ofpbuf_data(&xout_actions), actions, actions_len)) { goto exit; } - if (odp_flow_key_to_mask(udump->mask, udump->mask_len, &udump_mask, &flow) + if (odp_flow_key_to_mask(mask, mask_len, &dp_mask, &flow) == ODP_FIT_ERROR) { goto exit; } @@ -1343,16 +1319,17 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump, * mask in the kernel is more specific i.e. less wildcarded, than what * we've calculated here. This guarantees we don't catch any packets we * shouldn't with the megaflow. */ - udump32 = (uint32_t *) &udump_mask; + dp32 = (uint32_t *) &dp_mask; xout32 = (uint32_t *) &xout.wc.masks; for (i = 0; i < FLOW_U32S; i++) { - if ((udump32[i] | xout32[i]) != udump32[i]) { + if ((dp32[i] | xout32[i]) != dp32[i]) { goto exit; } } ok = true; exit: + ovs_mutex_unlock(&ukey->mutex); if (netflow) { if (!ok) { netflow_expire(netflow, &flow); @@ -1360,24 +1337,21 @@ exit: } netflow_unref(netflow); } - ofpbuf_delete(actions); xlate_out_uninit(xoutp); return ok; } struct dump_op { struct udpif_key *ukey; - struct udpif_flow_dump *udump; struct dpif_flow_stats stats; /* Stats for 'op'. */ struct dpif_op op; /* Flow del operation. */ }; static void dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len, - struct udpif_key *ukey, struct udpif_flow_dump *udump) + struct udpif_key *ukey) { op->ukey = ukey; - op->udump = udump; op->op.type = DPIF_OP_FLOW_DEL; op->op.u.flow_del.key = key; op->op.u.flow_del.key_len = key_len; @@ -1385,10 +1359,8 @@ dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len, } static void -push_dump_ops(struct revalidator *revalidator, - struct dump_op *ops, size_t n_ops) +push_dump_ops__(struct udpif *udpif, struct dump_op *ops, size_t n_ops) { - struct udpif *udpif = revalidator->udpif; struct dpif_op *opsp[REVALIDATE_MAX_BATCH]; size_t i; @@ -1405,10 +1377,12 @@ push_dump_ops(struct revalidator *revalidator, stats = op->op.u.flow_del.stats; if (op->ukey) { push = &push_buf; + ovs_mutex_lock(&op->ukey->mutex); push->used = MAX(stats->used, op->ukey->stats.used); push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags; push->n_packets = stats->n_packets - op->ukey->stats.n_packets; push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes; + ovs_mutex_unlock(&op->ukey->mutex); } else { push = stats; } @@ -1420,9 +1394,14 @@ push_dump_ops(struct revalidator *revalidator, bool may_learn; may_learn = push->n_packets > 0; - if (op->ukey && op->ukey->xcache) { - xlate_push_stats(op->ukey->xcache, may_learn, push); - continue; + if (op->ukey) { + ovs_mutex_lock(&op->ukey->mutex); + if (op->ukey->xcache) { + xlate_push_stats(op->ukey->xcache, may_learn, push); + ovs_mutex_unlock(&op->ukey->mutex); + continue; + } + ovs_mutex_unlock(&op->ukey->mutex); } if (!xlate_receive(udpif->backer, NULL, op->op.u.flow_del.key, @@ -1445,94 +1424,131 @@ push_dump_ops(struct revalidator *revalidator, } } } +} - for (i = 0; i < n_ops; i++) { - struct udpif_key *ukey; +static void +push_dump_ops(struct revalidator *revalidator, + struct dump_op *ops, size_t n_ops) +{ + int i; - /* If there's a udump, this ukey came directly from a datapath flow - * dump. Sometimes a datapath can send duplicates in flow dumps, in - * which case we wouldn't want to double-free a ukey, so avoid that by - * looking up the ukey again. - * - * If there's no udump then we know what we're doing. */ - ukey = (ops[i].udump - ? ukey_lookup(revalidator, ops[i].udump) - : ops[i].ukey); - if (ukey) { - ukey_delete(revalidator, ukey); - } + push_dump_ops__(revalidator->udpif, ops, n_ops); + for (i = 0; i < n_ops; i++) { + ukey_delete(revalidator, ops[i].ukey); } } static void -revalidate_udumps(struct revalidator *revalidator, struct list *udumps) +revalidate(struct revalidator *revalidator) { struct udpif *udpif = revalidator->udpif; struct dump_op ops[REVALIDATE_MAX_BATCH]; - struct udpif_flow_dump *udump, *next_udump; - size_t n_ops, n_flows; + const struct nlattr *key, *mask, *actions; + size_t key_len, mask_len, actions_len; + const struct dpif_flow_stats *stats; + long long int now; unsigned int flow_limit; - long long int max_idle; - bool must_del; + size_t n_ops; + void *state; + n_ops = 0; + now = time_msec(); atomic_read(&udpif->flow_limit, &flow_limit); - n_flows = udpif_get_n_flows(udpif); - - must_del = false; - max_idle = ofproto_max_idle; - if (n_flows > flow_limit) { - must_del = n_flows > 2 * flow_limit; - max_idle = 100; - } - - n_ops = 0; - LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) { - long long int used, now; + dpif_flow_dump_state_init(udpif->dpif, &state); + while (dpif_flow_dump_next(&udpif->dump, state, &key, &key_len, &mask, + &mask_len, &actions, &actions_len, &stats)) { struct udpif_key *ukey; + bool mark, may_destroy; + long long int used, max_idle; + uint32_t hash; + size_t n_flows; - now = time_msec(); - ukey = ukey_lookup(revalidator, udump); + hash = hash_bytes(key, key_len, udpif->secret); + ukey = ukey_lookup(udpif, key, key_len, hash); - used = udump->stats.used; + used = stats->used; if (!used && ukey) { + ovs_mutex_lock(&ukey->mutex); + + if (ukey->mark || !ukey->flow_exists) { + /* The flow has already been dumped. This can occasionally + * occur if the datapath is changed in the middle of a flow + * dump. Rather than perform the same work twice, skip the + * flow this time. */ + ovs_mutex_unlock(&ukey->mutex); + COVERAGE_INC(upcall_duplicate_flow); + continue; + } + used = ukey->created; + ovs_mutex_unlock(&ukey->mutex); } - if (must_del || (used && used < now - max_idle)) { - struct dump_op *dop = &ops[n_ops++]; + n_flows = udpif_get_n_flows(udpif); + max_idle = ofproto_max_idle; + if (n_flows > flow_limit) { + max_idle = 100; + } - dump_op_init(dop, udump->key, udump->key_len, ukey, udump); - continue; + if ((used && used < now - max_idle) || n_flows > flow_limit * 2) { + mark = false; + } else { + if (!ukey) { + ukey = ukey_create(key, key_len, used); + if (!udpif_insert_ukey(udpif, ukey, hash)) { + /* The same ukey has already been created. This means that + * another revalidator is processing this flow + * concurrently, so don't bother processing it. */ + ukey_delete(NULL, ukey); + continue; + } + } + + mark = revalidate_ukey(udpif, ukey, mask, mask_len, actions, + actions_len, stats); } - if (!ukey) { - ukey = ukey_create(udump->key, udump->key_len, used); - hmap_insert(&revalidator->ukeys, &ukey->hmap_node, - udump->key_hash); + if (ukey) { + ovs_mutex_lock(&ukey->mutex); + ukey->mark = ukey->flow_exists = mark; + ovs_mutex_unlock(&ukey->mutex); } - ukey->mark = true; - if (!revalidate_ukey(udpif, udump, ukey)) { - dpif_flow_del(udpif->dpif, udump->key, udump->key_len, NULL); - ukey_delete(revalidator, ukey); + if (!mark) { + dump_op_init(&ops[n_ops++], key, key_len, ukey); } - list_remove(&udump->list_node); - free(udump); - } + may_destroy = dpif_flow_dump_next_may_destroy_keys(&udpif->dump, + state); + + /* Only update 'now' immediately before 'buffer' will be updated. + * This gives us the current time relative to the time the datapath + * will write into 'stats'. */ + if (may_destroy) { + now = time_msec(); + } - push_dump_ops(revalidator, ops, n_ops); + /* Only do a dpif_operate when we've hit our maximum batch, or when our + * memory is about to be clobbered by the next call to + * dpif_flow_dump_next(). */ + if (n_ops == REVALIDATE_MAX_BATCH || (n_ops && may_destroy)) { + push_dump_ops__(udpif, ops, n_ops); + n_ops = 0; + } + } - LIST_FOR_EACH_SAFE (udump, next_udump, list_node, udumps) { - list_remove(&udump->list_node); - free(udump); + if (n_ops) { + push_dump_ops__(udpif, ops, n_ops); } + + dpif_flow_dump_state_uninit(udpif->dpif, state); } static void revalidator_sweep__(struct revalidator *revalidator, bool purge) + OVS_NO_THREAD_SAFETY_ANALYSIS { struct dump_op ops[REVALIDATE_MAX_BATCH]; struct udpif_key *ukey, *next; @@ -1540,16 +1556,20 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) n_ops = 0; - HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, &revalidator->ukeys) { + /* During garbage collection, this revalidator completely owns its ukeys + * map, and therefore doesn't need to do any locking. */ + HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, revalidator->ukeys) { if (!purge && ukey->mark) { ukey->mark = false; + } else if (!ukey->flow_exists) { + ukey_delete(revalidator, ukey); } else { struct dump_op *op = &ops[n_ops++]; /* If we have previously seen a flow in the datapath, but didn't * see it during the most recent dump, delete it. This allows us * to clean up the ukey and keep the statistics consistent. */ - dump_op_init(op, ukey->key, ukey->key_len, ukey, NULL); + dump_op_init(op, ukey->key, ukey->key_len, ukey); if (n_ops == REVALIDATE_MAX_BATCH) { push_dump_ops(revalidator, ops, n_ops); n_ops = 0; @@ -1597,13 +1617,10 @@ upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED, for (i = 0; i < n_revalidators; i++) { struct revalidator *revalidator = &udpif->revalidators[i]; - /* XXX: The result of hmap_count(&revalidator->ukeys) may not be - * accurate because it's not protected by the revalidator mutex. */ - ovs_mutex_lock(&revalidator->mutex); - ds_put_format(&ds, "\t%s: (dump queue %"PRIuSIZE") (keys %"PRIuSIZE - ")\n", revalidator->name, revalidator->n_udumps, - hmap_count(&revalidator->ukeys)); - ovs_mutex_unlock(&revalidator->mutex); + ovs_mutex_lock(&udpif->ukeys[i].mutex); + ds_put_format(&ds, "\t%s: (keys %"PRIuSIZE")\n", revalidator->name, + hmap_count(&udpif->ukeys[i].hmap)); + ovs_mutex_unlock(&udpif->ukeys[i].mutex); } }