revalidator: Only revalidate high-throughput flows.
[sliver-openvswitch.git] / ofproto / ofproto-dpif-upcall.c
index 1622888..0d7dd8e 100644 (file)
@@ -32,6 +32,7 @@
 #include "ofproto-dpif-ipfix.h"
 #include "ofproto-dpif-sflow.h"
 #include "ofproto-dpif-xlate.h"
+#include "ovs-rcu.h"
 #include "packets.h"
 #include "poll-loop.h"
 #include "seq.h"
@@ -41,7 +42,6 @@
 #define MAX_QUEUE_LENGTH 512
 #define FLOW_MISS_MAX_BATCH 50
 #define REVALIDATE_MAX_BATCH 50
-#define MAX_IDLE 1500
 
 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
 
@@ -168,6 +168,9 @@ struct udpif_key {
     bool mark;                     /* Used by mark and sweep GC algorithm. */
 
     struct odputil_keybuf key_buf; /* Memory for 'key'. */
+    struct xlate_cache *xcache;    /* Cache for xlate entries that
+                                    * are affected by this ukey.
+                                    * Used for stats and learning.*/
 };
 
 /* 'udpif_flow_dump's hold the state associated with one iteration in a flow
@@ -280,15 +283,12 @@ void
 udpif_destroy(struct udpif *udpif)
 {
     udpif_set_threads(udpif, 0, 0);
-    udpif_flush();
+    udpif_flush(udpif);
 
     list_remove(&udpif->list_node);
     latch_destroy(&udpif->exit_latch);
     seq_destroy(udpif->reval_seq);
     seq_destroy(udpif->dump_seq);
-    atomic_destroy(&udpif->flow_limit);
-    atomic_destroy(&udpif->n_flows);
-    atomic_destroy(&udpif->n_flows_timestamp);
     ovs_mutex_destroy(&udpif->n_flows_mutex);
     free(udpif);
 }
@@ -301,6 +301,9 @@ void
 udpif_set_threads(struct udpif *udpif, size_t n_handlers,
                   size_t n_revalidators)
 {
+    int error;
+
+    ovsrcu_quiesce_start();
     /* Stop the old threads (if any). */
     if (udpif->handlers &&
         (udpif->n_handlers != n_handlers
@@ -373,6 +376,13 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         udpif->n_handlers = 0;
     }
 
+    error = dpif_handlers_set(udpif->dpif, 1);
+    if (error) {
+        VLOG_ERR("failed to configure handlers in dpif %s: %s",
+                 dpif_name(udpif->dpif), ovs_strerror(error));
+        return;
+    }
+
     /* Start new threads (if necessary). */
     if (!udpif->handlers && n_handlers) {
         size_t i;
@@ -409,6 +419,8 @@ udpif_set_threads(struct udpif *udpif, size_t n_handlers,
         xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, udpif);
         xpthread_create(&udpif->flow_dumper, NULL, udpif_flow_dumper, udpif);
     }
+
+    ovsrcu_quiesce_end();
 }
 
 /* Waits for all ongoing upcall translations to complete.  This ensures that
@@ -474,16 +486,31 @@ udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
     }
 }
 
-/* Removes all flows from all datapaths. */
+/* Remove flows from a single datapath. */
 void
-udpif_flush(void)
+udpif_flush(struct udpif *udpif)
+{
+    size_t n_handlers, n_revalidators;
+
+    n_handlers = udpif->n_handlers;
+    n_revalidators = udpif->n_revalidators;
+
+    udpif_set_threads(udpif, 0, 0);
+    dpif_flow_flush(udpif->dpif);
+    udpif_set_threads(udpif, n_handlers, n_revalidators);
+}
+
+/* Removes all flows from all datapaths. */
+static void
+udpif_flush_all_datapaths(void)
 {
     struct udpif *udpif;
 
     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
-        dpif_flow_flush(udpif->dpif);
+        udpif_flush(udpif);
     }
 }
+
 \f
 /* Destroys and deallocates 'upcall'. */
 static void
@@ -528,7 +555,7 @@ udpif_dispatcher(void *arg)
     set_subprogram_name("dispatcher");
     while (!latch_is_set(&udpif->exit_latch)) {
         recv_upcalls(udpif);
-        dpif_recv_wait(udpif->dpif);
+        dpif_recv_wait(udpif->dpif, 0);
         latch_wait(&udpif->exit_latch);
         poll_block();
     }
@@ -552,6 +579,8 @@ udpif_flow_dumper(void *arg)
         bool need_revalidate;
         uint64_t reval_seq;
         size_t n_flows, i;
+        int error;
+        void *state = NULL;
 
         reval_seq = seq_read(udpif->reval_seq);
         need_revalidate = udpif->last_reval_seq != reval_seq;
@@ -562,9 +591,14 @@ udpif_flow_dumper(void *arg)
         udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
 
         start_time = time_msec();
-        dpif_flow_dump_start(&dump, udpif->dpif);
-        while (dpif_flow_dump_next(&dump, &key, &key_len, &mask, &mask_len,
-                                   NULL, NULL, &stats)
+        error = dpif_flow_dump_start(&dump, udpif->dpif);
+        if (error) {
+            VLOG_INFO("Failed to start flow dump (%s)", ovs_strerror(error));
+            goto skip;
+        }
+        dpif_flow_dump_state_init(udpif->dpif, &state);
+        while (dpif_flow_dump_next(&dump, state, &key, &key_len,
+                                   &mask, &mask_len, NULL, NULL, &stats)
                && !latch_is_set(&udpif->exit_latch)) {
             struct udpif_flow_dump *udump = xmalloc(sizeof *udump);
             struct revalidator *revalidator;
@@ -595,6 +629,7 @@ udpif_flow_dumper(void *arg)
             xpthread_cond_signal(&revalidator->wake_cond);
             ovs_mutex_unlock(&revalidator->mutex);
         }
+        dpif_flow_dump_state_uninit(udpif->dpif, state);
         dpif_flow_dump_done(&dump);
 
         /* Let all the revalidators finish and garbage collect. */
@@ -637,7 +672,8 @@ udpif_flow_dumper(void *arg)
                       duration);
         }
 
-        poll_timer_wait_until(start_time + MIN(MAX_IDLE, 500));
+skip:
+        poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
         seq_wait(udpif->reval_seq, udpif->last_reval_seq);
         latch_wait(&udpif->exit_latch);
         poll_block();
@@ -662,7 +698,10 @@ udpif_upcall_handler(void *arg)
         size_t i;
 
         ovs_mutex_lock(&handler->mutex);
-        if (!handler->n_upcalls) {
+        /* Must check the 'exit_latch' again to make sure the main thread is
+         * not joining on the handler thread. */
+        if (!handler->n_upcalls
+            && !latch_is_set(&handler->udpif->exit_latch)) {
             ovs_mutex_cond_wait(&handler->wake_cond, &handler->mutex);
         }
 
@@ -800,7 +839,7 @@ recv_upcalls(struct udpif *udpif)
         upcall = xmalloc(sizeof *upcall);
         ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
                         sizeof upcall->upcall_stub);
-        error = dpif_recv(udpif->dpif, &upcall->dpif_upcall,
+        error = dpif_recv(udpif->dpif, 0, &upcall->dpif_upcall,
                           &upcall->upcall_buf);
         if (error) {
             /* upcall_destroy() can only be called on successfully received
@@ -888,7 +927,7 @@ compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
     port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
         ? ODPP_NONE
         : odp_in_port;
-    pid = dpif_port_get_pid(udpif->dpif, port);
+    pid = dpif_port_get_pid(udpif->dpif, port, 0);
     odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);
 }
 
@@ -983,10 +1022,9 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
         type = classify_upcall(upcall);
         if (type == MISS_UPCALL) {
             uint32_t hash;
+            struct pkt_metadata md = pkt_metadata_from_flow(&flow);
 
-            flow_extract(packet, flow.skb_priority, flow.pkt_mark,
-                         &flow.tunnel, &flow.in_port, &miss->flow);
-
+            flow_extract(packet, &md, &miss->flow);
             hash = flow_hash(&miss->flow, 0);
             existing_miss = flow_miss_find(&misses, ofproto, &miss->flow,
                                            hash);
@@ -1007,8 +1045,8 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
             } else {
                 miss = existing_miss;
             }
-            miss->stats.tcp_flags |= packet_get_tcp_flags(packet, &miss->flow);
-            miss->stats.n_bytes += packet->size;
+            miss->stats.tcp_flags |= ntohs(miss->flow.tcp_flags);
+            miss->stats.n_bytes += ofpbuf_size(packet);
             miss->stats.n_packets++;
 
             upcall->flow_miss = miss;
@@ -1129,7 +1167,7 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
              * the packet contained no VLAN.  So, we must remove the
              * VLAN header from the packet before trying to execute the
              * actions. */
-            if (miss->xout.odp_actions.size) {
+            if (ofpbuf_size(&miss->xout.odp_actions)) {
                 eth_pop_vlan(packet);
             }
 
@@ -1169,21 +1207,21 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
             op->u.flow_put.flags = DPIF_FP_CREATE | DPIF_FP_MODIFY;
             op->u.flow_put.key = miss->key;
             op->u.flow_put.key_len = miss->key_len;
-            op->u.flow_put.mask = mask.data;
-            op->u.flow_put.mask_len = mask.size;
+            op->u.flow_put.mask = ofpbuf_data(&mask);
+            op->u.flow_put.mask_len = ofpbuf_size(&mask);
             op->u.flow_put.stats = NULL;
 
             if (!miss->xout.slow) {
-                op->u.flow_put.actions = miss->xout.odp_actions.data;
-                op->u.flow_put.actions_len = miss->xout.odp_actions.size;
+                op->u.flow_put.actions = ofpbuf_data(&miss->xout.odp_actions);
+                op->u.flow_put.actions_len = ofpbuf_size(&miss->xout.odp_actions);
             } else {
                 struct ofpbuf buf;
 
                 ofpbuf_use_stack(&buf, miss->slow_path_buf,
                                  sizeof miss->slow_path_buf);
                 compose_slow_path(udpif, &miss->xout, miss->odp_in_port, &buf);
-                op->u.flow_put.actions = buf.data;
-                op->u.flow_put.actions_len = buf.size;
+                op->u.flow_put.actions = ofpbuf_data(&buf);
+                op->u.flow_put.actions_len = ofpbuf_size(&buf);
             }
         }
 
@@ -1193,15 +1231,15 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
          * upcall. */
         miss->flow.vlan_tci = flow_vlan_tci;
 
-        if (miss->xout.odp_actions.size) {
+        if (ofpbuf_size(&miss->xout.odp_actions)) {
 
             op = &ops[n_ops++];
             op->type = DPIF_OP_EXECUTE;
             op->u.execute.packet = packet;
             odp_key_to_pkt_metadata(miss->key, miss->key_len,
                                     &op->u.execute.md);
-            op->u.execute.actions = miss->xout.odp_actions.data;
-            op->u.execute.actions_len = miss->xout.odp_actions.size;
+            op->u.execute.actions = ofpbuf_data(&miss->xout.odp_actions);
+            op->u.execute.actions_len = ofpbuf_size(&miss->xout.odp_actions);
             op->u.execute.needs_help = (miss->xout.slow & SLOW_ACTION) != 0;
         }
     }
@@ -1222,14 +1260,14 @@ handle_upcalls(struct handler *handler, struct list *upcalls)
             struct ofproto_packet_in *pin;
 
             pin = xmalloc(sizeof *pin);
-            pin->up.packet = xmemdup(packet->data, packet->size);
-            pin->up.packet_len = packet->size;
+            pin->up.packet = xmemdup(ofpbuf_data(packet), ofpbuf_size(packet));
+            pin->up.packet_len = ofpbuf_size(packet);
             pin->up.reason = OFPR_NO_MATCH;
             pin->up.table_id = 0;
             pin->up.cookie = OVS_BE64_MAX;
             flow_get_metadata(&miss->flow, &pin->up.fmd);
             pin->send_len = 0; /* Not used for flow table misses. */
-            pin->generated_by_table_miss = false;
+            pin->miss_type = OFPROTO_PACKET_IN_NO_MISS;
             ofproto_dpif_send_packet_in(miss->ofproto, pin);
         }
     }
@@ -1279,6 +1317,7 @@ ukey_create(const struct nlattr *key, size_t key_len, long long int used)
     ukey->mark = false;
     ukey->created = used ? used : time_msec();
     memset(&ukey->stats, 0, sizeof ukey->stats);
+    ukey->xcache = NULL;
 
     return ukey;
 }
@@ -1287,9 +1326,36 @@ static void
 ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey)
 {
     hmap_remove(&revalidator->ukeys, &ukey->hmap_node);
+    xlate_cache_delete(ukey->xcache);
     free(ukey);
 }
 
+static bool
+should_revalidate(uint64_t packets, long long int used)
+{
+    long long int metric, now, duration;
+
+    /* Calculate the mean time between seeing these packets. If this
+     * exceeds the threshold, then delete the flow rather than performing
+     * costly revalidation for flows that aren't being hit frequently.
+     *
+     * This is targeted at situations where the dump_duration is high (~1s),
+     * and revalidation is triggered by a call to udpif_revalidate(). In
+     * these situations, revalidation of all flows causes fluctuations in the
+     * flow_limit due to the interaction with the dump_duration and max_idle.
+     * This tends to result in deletion of low-throughput flows anyway, so
+     * skip the revalidation and just delete those flows. */
+    packets = MAX(packets, 1);
+    now = MAX(used, time_msec());
+    duration = now - used;
+    metric = duration / packets;
+
+    if (metric > 200) {
+        return false;
+    }
+    return true;
+}
+
 static bool
 revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
                 struct udpif_key *ukey)
@@ -1297,19 +1363,23 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
     struct ofpbuf xout_actions, *actions;
     uint64_t slow_path_buf[128 / 8];
     struct xlate_out xout, *xoutp;
+    struct netflow *netflow;
     struct flow flow, udump_mask;
     struct ofproto_dpif *ofproto;
     struct dpif_flow_stats push;
     uint32_t *udump32, *xout32;
     odp_port_t odp_in_port;
     struct xlate_in xin;
+    long long int last_used;
     int error;
     size_t i;
-    bool ok;
+    bool may_learn, ok;
 
     ok = false;
     xoutp = NULL;
     actions = NULL;
+    netflow = NULL;
+    may_learn = push.n_packets > 0;
 
     /* If we don't need to revalidate, we can simply push the stats contained
      * in the udump, otherwise we'll have to get the actions so we can check
@@ -1321,6 +1391,7 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
         }
     }
 
+    last_used = ukey->stats.used;
     push.used = udump->stats.used;
     push.tcp_flags = udump->stats.tcp_flags;
     push.n_packets = udump->stats.n_packets > ukey->stats.n_packets
@@ -1331,20 +1402,40 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
         : 0;
     ukey->stats = udump->stats;
 
+    if (udump->need_revalidate && last_used
+        && !should_revalidate(push.n_packets, last_used)) {
+        ok = false;
+        goto exit;
+    }
+
     if (!push.n_packets && !udump->need_revalidate) {
         ok = true;
         goto exit;
     }
 
+    if (ukey->xcache && !udump->need_revalidate) {
+        xlate_push_stats(ukey->xcache, may_learn, &push);
+        ok = true;
+        goto exit;
+    }
+
     error = xlate_receive(udpif->backer, NULL, ukey->key, ukey->key_len, &flow,
-                          &ofproto, NULL, NULL, NULL, &odp_in_port);
+                          &ofproto, NULL, NULL, &netflow, &odp_in_port);
     if (error) {
         goto exit;
     }
 
+    if (udump->need_revalidate) {
+        xlate_cache_clear(ukey->xcache);
+    }
+    if (!ukey->xcache) {
+        ukey->xcache = xlate_cache_new();
+    }
+
     xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags, NULL);
     xin.resubmit_stats = push.n_packets ? &push : NULL;
-    xin.may_learn = push.n_packets > 0;
+    xin.xcache = ukey->xcache;
+    xin.may_learn = may_learn;
     xin.skip_wildcards = !udump->need_revalidate;
     xlate_actions(&xin, &xout);
     xoutp = &xout;
@@ -1355,8 +1446,8 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
     }
 
     if (!xout.slow) {
-        ofpbuf_use_const(&xout_actions, xout.odp_actions.data,
-                         xout.odp_actions.size);
+        ofpbuf_use_const(&xout_actions, ofpbuf_data(&xout.odp_actions),
+                         ofpbuf_size(&xout.odp_actions));
     } else {
         ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);
         compose_slow_path(udpif, &xout, odp_in_port, &xout_actions);
@@ -1386,6 +1477,13 @@ revalidate_ukey(struct udpif *udpif, struct udpif_flow_dump *udump,
     ok = true;
 
 exit:
+    if (netflow) {
+        if (!ok) {
+            netflow_expire(netflow, &flow);
+            netflow_flow_clear(netflow, &flow);
+        }
+        netflow_unref(netflow);
+    }
     ofpbuf_delete(actions);
     xlate_out_uninit(xoutp);
     return ok;
@@ -1443,6 +1541,13 @@ push_dump_ops(struct revalidator *revalidator,
             struct ofproto_dpif *ofproto;
             struct netflow *netflow;
             struct flow flow;
+            bool may_learn;
+
+            may_learn = push->n_packets > 0;
+            if (op->ukey && op->ukey->xcache) {
+                xlate_push_stats(op->ukey->xcache, may_learn, push);
+                continue;
+            }
 
             if (!xlate_receive(udpif->backer, NULL, op->op.u.flow_del.key,
                                op->op.u.flow_del.key_len, &flow, &ofproto,
@@ -1452,7 +1557,7 @@ push_dump_ops(struct revalidator *revalidator,
                 xlate_in_init(&xin, ofproto, &flow, NULL, push->tcp_flags,
                               NULL);
                 xin.resubmit_stats = push->n_packets ? push : NULL;
-                xin.may_learn = push->n_packets > 0;
+                xin.may_learn = may_learn;
                 xin.skip_wildcards = true;
                 xlate_actions_for_side_effects(&xin);
 
@@ -1466,12 +1571,17 @@ push_dump_ops(struct revalidator *revalidator,
     }
 
     for (i = 0; i < n_ops; i++) {
-        struct udpif_key *ukey = ops[i].ukey;
+        struct udpif_key *ukey;
 
-        /* Look up the ukey to prevent double-free in case 'ops' contains a
-         * given ukey more than once (which can happen if the datapath dumps a
-         * given flow more than once). */
-        ukey = ukey_lookup(revalidator, ops[i].udump);
+        /* If there's a udump, this ukey came directly from a datapath flow
+         * dump.  Sometimes a datapath can send duplicates in flow dumps, in
+         * which case we wouldn't want to double-free a ukey, so avoid that by
+         * looking up the ukey again.
+         *
+         * If there's no udump then we know what we're doing. */
+        ukey = (ops[i].udump
+                ? ukey_lookup(revalidator, ops[i].udump)
+                : ops[i].ukey);
         if (ukey) {
             ukey_delete(revalidator, ukey);
         }
@@ -1495,7 +1605,7 @@ revalidate_udumps(struct revalidator *revalidator, struct list *udumps)
     n_flows = udpif_get_n_flows(udpif);
 
     must_del = false;
-    max_idle = MAX_IDLE;
+    max_idle = ofproto_max_idle;
     if (n_flows > flow_limit) {
         must_del = n_flows > 2 * flow_limit;
         max_idle = 100;
@@ -1646,7 +1756,7 @@ upcall_unixctl_disable_megaflows(struct unixctl_conn *conn,
                                  void *aux OVS_UNUSED)
 {
     atomic_store(&enable_megaflows, false);
-    udpif_flush();
+    udpif_flush_all_datapaths();
     unixctl_command_reply(conn, "megaflows disabled");
 }
 
@@ -1661,7 +1771,7 @@ upcall_unixctl_enable_megaflows(struct unixctl_conn *conn,
                                 void *aux OVS_UNUSED)
 {
     atomic_store(&enable_megaflows, true);
-    udpif_flush();
+    udpif_flush_all_datapaths();
     unixctl_command_reply(conn, "megaflows enabled");
 }