dpif-netdev: Implement the API functions to allow multiple handler
authorAlex Wang <alexw@nicira.com>
Wed, 26 Feb 2014 18:07:38 +0000 (10:07 -0800)
committerAlex Wang <alexw@nicira.com>
Thu, 20 Mar 2014 17:27:20 +0000 (10:27 -0700)
threads read upcall.

This commit implements the API functions to allow multiple handler
threads read upcall.

Also, this commit removes the handling priority of DPIF_UC_MISS
over DPIF_UC_ACTION.  So, both misses will be put to the same
queue.  The decision is based on the fact that a lot has changed
since the age when flow setup rate is most treasured and starving
all actions in the presence of any flow misses doesn't seem like
a sound balancing solution.

Thusly the current implementation will be put in testing and
investigation for better balancing solution will continue if
there is an issue.

Also note, the introduction and use of flow_hash_5tuple() will
put missed ICMP packets from same source but with different
type/code to different handler queues.  This may cause reordering
of these packets.  For now, we do not count this as a problem.

Signed-off-by: Alex Wang <alexw@nicira.com>
Acked-by: Ben Pfaff <blp@nicira.com>
lib/dpif-netdev.c
lib/flow.c
lib/flow.h

index 1c23739..3d0c09f 100644 (file)
@@ -74,7 +74,6 @@ enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
 enum { DP_NETDEV_HEADROOM = 2 + VLAN_HEADER_LEN };
 
 /* Queues. */
-enum { N_QUEUES = 2 };          /* Number of queues for dpif_recv(). */
 enum { MAX_QUEUE_LEN = 128 };   /* Maximum number of packets per queue. */
 enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 };
 BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN));
@@ -91,14 +90,17 @@ struct dp_netdev_upcall {
     struct ofpbuf buf;          /* ofpbuf instance for upcall.packet. */
 };
 
-/* A queue passing packets from a struct dp_netdev to its clients.
+/* A queue passing packets from a struct dp_netdev to its clients (handlers).
  *
  *
  * Thread-safety
  * =============
  *
- * Any access at all requires the owning 'dp_netdev''s queue_mutex. */
+ * Any access at all requires the owning 'dp_netdev''s queue_rwlock and
+ * its own mutex. */
 struct dp_netdev_queue {
+    struct ovs_mutex mutex;
+    struct seq *seq;      /* Incremented whenever a packet is queued. */
     struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
     unsigned int head OVS_GUARDED;
     unsigned int tail OVS_GUARDED;
@@ -119,7 +121,7 @@ struct dp_netdev_queue {
  *    port_rwlock
  *    flow_mutex
  *    cls.rwlock
- *    queue_mutex
+ *    queue_rwlock
  */
 struct dp_netdev {
     const struct dpif_class *const class;
@@ -141,10 +143,12 @@ struct dp_netdev {
 
     /* Queues.
      *
-     * Everything in 'queues' is protected by 'queue_mutex'. */
-    struct ovs_mutex queue_mutex;
-    struct dp_netdev_queue queues[N_QUEUES];
-    struct seq *queue_seq;      /* Incremented whenever a packet is queued. */
+     * 'queue_rwlock' protects the modification of 'handler_queues' and
+     * 'n_handlers'.  The queue elements are protected by its
+     * 'handler_queues''s mutex. */
+    struct fat_rwlock queue_rwlock;
+    struct dp_netdev_queue *handler_queues;
+    uint32_t n_handlers;
 
     /* Statistics.
      *
@@ -329,12 +333,15 @@ static int do_add_port(struct dp_netdev *dp, const char *devname,
     OVS_REQ_WRLOCK(dp->port_rwlock);
 static int do_del_port(struct dp_netdev *dp, odp_port_t port_no)
     OVS_REQ_WRLOCK(dp->port_rwlock);
+static void dp_netdev_destroy_all_queues(struct dp_netdev *dp)
+    OVS_REQ_WRLOCK(dp->queue_rwlock);
 static int dpif_netdev_open(const struct dpif_class *, const char *name,
                             bool create, struct dpif **);
 static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *,
-                                    int queue_no, const struct flow *,
-                                    const struct nlattr *userdata)
-    OVS_EXCLUDED(dp->queue_mutex);
+                                      int queue_no, int type,
+                                      const struct flow *,
+                                      const struct nlattr *userdata)
+    OVS_EXCLUDED(dp->queue_rwlock);
 static void dp_netdev_execute_actions(struct dp_netdev *dp,
                                       const struct flow *, struct ofpbuf *,
                                       struct pkt_metadata *,
@@ -452,7 +459,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
 {
     struct dp_netdev *dp;
     int error;
-    int i;
 
     dp = xzalloc(sizeof *dp);
     shash_add(&dp_netdevs, name, dp);
@@ -466,13 +472,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     classifier_init(&dp->cls, NULL);
     hmap_init(&dp->flow_table);
 
-    ovs_mutex_init(&dp->queue_mutex);
-    ovs_mutex_lock(&dp->queue_mutex);
-    for (i = 0; i < N_QUEUES; i++) {
-        dp->queues[i].head = dp->queues[i].tail = 0;
-    }
-    ovs_mutex_unlock(&dp->queue_mutex);
-    dp->queue_seq = seq_create();
+    fat_rwlock_init(&dp->queue_rwlock);
 
     ovsthread_stats_init(&dp->stats);
 
@@ -520,20 +520,21 @@ dpif_netdev_open(const struct dpif_class *class, const char *name,
 
 static void
 dp_netdev_purge_queues(struct dp_netdev *dp)
+    OVS_REQ_WRLOCK(dp->queue_rwlock)
 {
     int i;
 
-    ovs_mutex_lock(&dp->queue_mutex);
-    for (i = 0; i < N_QUEUES; i++) {
-        struct dp_netdev_queue *q = &dp->queues[i];
+    for (i = 0; i < dp->n_handlers; i++) {
+        struct dp_netdev_queue *q = &dp->handler_queues[i];
 
+        ovs_mutex_lock(&q->mutex);
         while (q->tail != q->head) {
             struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
             ofpbuf_uninit(&u->upcall.packet);
             ofpbuf_uninit(&u->buf);
         }
+        ovs_mutex_unlock(&q->mutex);
     }
-    ovs_mutex_unlock(&dp->queue_mutex);
 }
 
 /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
@@ -564,9 +565,11 @@ dp_netdev_free(struct dp_netdev *dp)
     }
     ovsthread_stats_destroy(&dp->stats);
 
-    dp_netdev_purge_queues(dp);
-    seq_destroy(dp->queue_seq);
-    ovs_mutex_destroy(&dp->queue_mutex);
+    fat_rwlock_wrlock(&dp->queue_rwlock);
+    dp_netdev_destroy_all_queues(dp);
+    fat_rwlock_unlock(&dp->queue_rwlock);
+
+    fat_rwlock_destroy(&dp->queue_rwlock);
 
     classifier_destroy(&dp->cls);
     hmap_destroy(&dp->flow_table);
@@ -1472,16 +1475,77 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
     return 0;
 }
 
+static void
+dp_netdev_destroy_all_queues(struct dp_netdev *dp)
+    OVS_REQ_WRLOCK(dp->queue_rwlock)
+{
+    size_t i;
+
+    dp_netdev_purge_queues(dp);
+
+    for (i = 0; i < dp->n_handlers; i++) {
+        struct dp_netdev_queue *q = &dp->handler_queues[i];
+
+        ovs_mutex_destroy(&q->mutex);
+        seq_destroy(q->seq);
+    }
+    free(dp->handler_queues);
+    dp->handler_queues = NULL;
+    dp->n_handlers = 0;
+}
+
+static void
+dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers)
+    OVS_REQ_WRLOCK(dp->queue_rwlock)
+{
+    if (dp->n_handlers != n_handlers) {
+        size_t i;
+
+        dp_netdev_destroy_all_queues(dp);
+
+        dp->n_handlers = n_handlers;
+        dp->handler_queues = xzalloc(n_handlers * sizeof *dp->handler_queues);
+
+        for (i = 0; i < n_handlers; i++) {
+            struct dp_netdev_queue *q = &dp->handler_queues[i];
+
+            ovs_mutex_init(&q->mutex);
+            q->seq = seq_create();
+        }
+    }
+}
+
 static int
-dpif_netdev_recv_set(struct dpif *dpif OVS_UNUSED, bool enable OVS_UNUSED)
+dpif_netdev_recv_set(struct dpif *dpif, bool enable)
 {
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+
+    if ((dp->handler_queues != NULL) == enable) {
+        return 0;
+    }
+
+    fat_rwlock_wrlock(&dp->queue_rwlock);
+    if (!enable) {
+        dp_netdev_destroy_all_queues(dp);
+    } else {
+        dp_netdev_refresh_queues(dp, 1);
+    }
+    fat_rwlock_unlock(&dp->queue_rwlock);
+
     return 0;
 }
 
 static int
-dpif_netdev_handlers_set(struct dpif *dpif OVS_UNUSED,
-                         uint32_t n_handlers OVS_UNUSED)
+dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers)
 {
+    struct dp_netdev *dp = get_dp_netdev(dpif);
+
+    fat_rwlock_wrlock(&dp->queue_rwlock);
+    if (dp->handler_queues) {
+        dp_netdev_refresh_queues(dp, n_handlers);
+    }
+    fat_rwlock_unlock(&dp->queue_rwlock);
+
     return 0;
 }
 
@@ -1493,62 +1557,86 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
     return 0;
 }
 
-static struct dp_netdev_queue *
-find_nonempty_queue(struct dp_netdev *dp)
-    OVS_REQUIRES(dp->queue_mutex)
+static bool
+dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t handler_id)
+    OVS_REQ_RDLOCK(dp->queue_rwlock)
 {
-    int i;
+    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
-    for (i = 0; i < N_QUEUES; i++) {
-        struct dp_netdev_queue *q = &dp->queues[i];
-        if (q->head != q->tail) {
-            return q;
-        }
+    if (!dp->handler_queues) {
+        VLOG_WARN_RL(&rl, "receiving upcall disabled");
+        return false;
     }
-    return NULL;
+
+    if (handler_id >= dp->n_handlers) {
+        VLOG_WARN_RL(&rl, "handler index out of bound");
+        return false;
+    }
+
+    return true;
 }
 
 static int
-dpif_netdev_recv(struct dpif *dpif, uint32_t n_handlers OVS_UNUSED,
+dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id,
                  struct dpif_upcall *upcall, struct ofpbuf *buf)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_queue *q;
-    int error;
+    int error = 0;
+
+    fat_rwlock_rdlock(&dp->queue_rwlock);
 
-    ovs_mutex_lock(&dp->queue_mutex);
-    q = find_nonempty_queue(dp);
-    if (q) {
+    if (!dp_netdev_recv_check(dp, handler_id)) {
+        error = EAGAIN;
+        goto out;
+    }
+
+    q = &dp->handler_queues[handler_id];
+    ovs_mutex_lock(&q->mutex);
+    if (q->head != q->tail) {
         struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
 
         *upcall = u->upcall;
 
         ofpbuf_uninit(buf);
         *buf = u->buf;
-
-        error = 0;
     } else {
         error = EAGAIN;
     }
-    ovs_mutex_unlock(&dp->queue_mutex);
+    ovs_mutex_unlock(&q->mutex);
+
+out:
+    fat_rwlock_unlock(&dp->queue_rwlock);
 
     return error;
 }
 
 static void
-dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id OVS_UNUSED)
+dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_queue *q;
     uint64_t seq;
 
-    ovs_mutex_lock(&dp->queue_mutex);
-    seq = seq_read(dp->queue_seq);
-    if (find_nonempty_queue(dp)) {
+    fat_rwlock_rdlock(&dp->queue_rwlock);
+
+    if (!dp_netdev_recv_check(dp, handler_id)) {
+        goto out;
+    }
+
+    q = &dp->handler_queues[handler_id];
+    ovs_mutex_lock(&q->mutex);
+    seq = seq_read(q->seq);
+    if (q->head != q->tail) {
         poll_immediate_wake();
     } else {
-        seq_wait(dp->queue_seq, seq);
+        seq_wait(q->seq, seq);
     }
-    ovs_mutex_unlock(&dp->queue_mutex);
+
+    ovs_mutex_unlock(&q->mutex);
+
+out:
+    fat_rwlock_unlock(&dp->queue_rwlock);
 }
 
 static void
@@ -1556,7 +1644,9 @@ dpif_netdev_recv_purge(struct dpif *dpif)
 {
     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
 
+    fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock);
     dp_netdev_purge_queues(dpif_netdev->dp);
+    fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock);
 }
 \f
 /* Creates and returns a new 'struct dp_netdev_actions', with a reference count
@@ -1776,29 +1866,33 @@ dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet,
         dp_netdev_execute_actions(dp, &key, packet, md,
                                   actions->actions, actions->size);
         dp_netdev_count_packet(dp, DP_STAT_HIT);
-    } else {
+    } else if (dp->handler_queues) {
         dp_netdev_count_packet(dp, DP_STAT_MISS);
-        dp_netdev_output_userspace(dp, packet, DPIF_UC_MISS, &key, NULL);
+        dp_netdev_output_userspace(dp, packet,
+                                   flow_hash_5tuple(&key, 0) % dp->n_handlers,
+                                   DPIF_UC_MISS, &key, NULL);
     }
 }
 
 static int
 dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
-                           int queue_no, const struct flow *flow,
+                           int queue_no, int type, const struct flow *flow,
                            const struct nlattr *userdata)
-    OVS_EXCLUDED(dp->queue_mutex)
+    OVS_EXCLUDED(dp->queue_rwlock)
 {
-    struct dp_netdev_queue *q = &dp->queues[queue_no];
+    struct dp_netdev_queue *q;
     int error;
 
-    ovs_mutex_lock(&dp->queue_mutex);
+    fat_rwlock_rdlock(&dp->queue_rwlock);
+    q = &dp->handler_queues[queue_no];
+    ovs_mutex_lock(&q->mutex);
     if (q->head - q->tail < MAX_QUEUE_LEN) {
         struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
         struct dpif_upcall *upcall = &u->upcall;
         struct ofpbuf *buf = &u->buf;
         size_t buf_size;
 
-        upcall->type = queue_no;
+        upcall->type = type;
 
         /* Allocate buffer big enough for everything. */
         buf_size = ODPUTIL_FLOW_KEY_BYTES;
@@ -1823,14 +1917,15 @@ dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
         upcall->packet = *packet;
         ofpbuf_use(packet, NULL, 0);
 
-        seq_change(dp->queue_seq);
+        seq_change(q->seq);
 
         error = 0;
     } else {
         dp_netdev_count_packet(dp, DP_STAT_LOST);
         error = ENOBUFS;
     }
-    ovs_mutex_unlock(&dp->queue_mutex);
+    ovs_mutex_unlock(&q->mutex);
+    fat_rwlock_unlock(&dp->queue_rwlock);
 
     return error;
 }
@@ -1867,7 +1962,10 @@ dp_execute_cb(void *aux_, struct ofpbuf *packet,
         if (!may_steal) {
             packet = ofpbuf_clone_with_headroom(packet, DP_NETDEV_HEADROOM);
         }
-        dp_netdev_output_userspace(aux->dp, packet, DPIF_UC_ACTION, aux->key,
+        dp_netdev_output_userspace(aux->dp, packet,
+                                   flow_hash_5tuple(aux->key, 0)
+                                       % aux->dp->n_handlers,
+                                   DPIF_UC_ACTION, aux->key,
                                    userdata);
         if (!may_steal) {
             ofpbuf_uninit(packet);
index c482474..00e66a4 100644 (file)
@@ -818,6 +818,25 @@ flow_wildcards_set_reg_mask(struct flow_wildcards *wc, int idx, uint32_t mask)
     wc->masks.regs[idx] = mask;
 }
 
+/* Calculates the 5-tuple hash from the given flow. */
+uint32_t
+flow_hash_5tuple(const struct flow *flow, uint32_t basis)
+{
+    uint32_t hash = 0;
+
+    if (!flow) {
+        return 0;
+    }
+
+    hash = mhash_add(hash, (OVS_FORCE unsigned int) flow->nw_src);
+    hash = mhash_add(basis, (OVS_FORCE unsigned int) flow->nw_dst);
+    hash = mhash_add(hash, ((OVS_FORCE unsigned int) flow->tp_src << 16)
+                           | (OVS_FORCE unsigned int) flow->tp_dst);
+    hash = mhash_add(hash, flow->nw_proto);
+
+    return mhash_finish(hash, 13);
+}
+
 /* Hashes 'flow' based on its L2 through L4 protocol information. */
 uint32_t
 flow_hash_symmetric_l4(const struct flow *flow, uint32_t basis)
index 8165bcf..8b4ffad 100644 (file)
@@ -323,6 +323,7 @@ void flow_wildcards_fold_minimask_range(struct flow_wildcards *,
 uint32_t flow_wildcards_hash(const struct flow_wildcards *, uint32_t basis);
 bool flow_wildcards_equal(const struct flow_wildcards *,
                           const struct flow_wildcards *);
+uint32_t flow_hash_5tuple(const struct flow *flow, uint32_t basis);
 uint32_t flow_hash_symmetric_l4(const struct flow *flow, uint32_t basis);
 
 /* Initialize a flow with random fields that matter for nx_hash_fields. */