dpif-netdev: Introduce new mutex to protect queues.
authorBen Pfaff <blp@nicira.com>
Fri, 27 Dec 2013 17:42:51 +0000 (09:42 -0800)
committerBen Pfaff <blp@nicira.com>
Thu, 9 Jan 2014 01:13:31 +0000 (17:13 -0800)
This is a first step in making thread safety more granular in dpif-netdev,
to allow for multithreaded forwarding.

Signed-off-by: Ben Pfaff <blp@nicira.com>
lib/dpif-netdev.c

index 5292818..285967d 100644 (file)
@@ -83,8 +83,9 @@ struct dp_netdev_upcall {
 };
 
 struct dp_netdev_queue {
-    struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN];
-    unsigned int head, tail;
+    struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
+    unsigned int head OVS_GUARDED;
+    unsigned int tail OVS_GUARDED;
 };
 
 /* Datapath based on the network device interface from netdev.h. */
@@ -94,9 +95,12 @@ struct dp_netdev {
     struct ovs_refcount ref_cnt;
     atomic_flag destroyed;
 
-    struct dp_netdev_queue queues[N_QUEUES];
     struct classifier cls;      /* Classifier. */
     struct hmap flow_table;     /* Flow table. */
+
+    /* Queues. */
+    struct ovs_mutex queue_mutex;
+    struct dp_netdev_queue queues[N_QUEUES];
     struct seq *queue_seq;      /* Incremented whenever a packet is queued. */
 
     /* Statistics. */
@@ -190,9 +194,10 @@ static int do_add_port(struct dp_netdev *, const char *devname,
 static int do_del_port(struct dp_netdev *, odp_port_t port_no);
 static int dpif_netdev_open(const struct dpif_class *, const char *name,
                             bool create, struct dpif **);
-static int dp_netdev_output_userspace(struct dp_netdev *, struct ofpbuf *,
+static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *,
                                     int queue_no, const struct flow *,
-                                    const struct nlattr *userdata);
+                                    const struct nlattr *userdata)
+    OVS_EXCLUDED(dp->queue_mutex);
 static void dp_netdev_execute_actions(struct dp_netdev *, const struct flow *,
                                       struct ofpbuf *, struct pkt_metadata *,
                                       const struct nlattr *actions,
@@ -311,9 +316,12 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     dp->name = xstrdup(name);
     ovs_refcount_init(&dp->ref_cnt);
     atomic_flag_init(&dp->destroyed);
+    ovs_mutex_init(&dp->queue_mutex);
+    ovs_mutex_lock(&dp->queue_mutex);
     for (i = 0; i < N_QUEUES; i++) {
         dp->queues[i].head = dp->queues[i].tail = 0;
     }
+    ovs_mutex_unlock(&dp->queue_mutex);
     dp->queue_seq = seq_create();
     classifier_init(&dp->cls, NULL);
     hmap_init(&dp->flow_table);
@@ -366,6 +374,7 @@ dp_netdev_purge_queues(struct dp_netdev *dp)
 {
     int i;
 
+    ovs_mutex_lock(&dp->queue_mutex);
     for (i = 0; i < N_QUEUES; i++) {
         struct dp_netdev_queue *q = &dp->queues[i];
 
@@ -375,6 +384,7 @@ dp_netdev_purge_queues(struct dp_netdev *dp)
             ofpbuf_uninit(&u->buf);
         }
     }
+    ovs_mutex_unlock(&dp->queue_mutex);
 }
 
 static void
@@ -389,8 +399,11 @@ dp_netdev_free(struct dp_netdev *dp)
     ovsthread_counter_destroy(dp->n_hit);
     ovsthread_counter_destroy(dp->n_missed);
     ovsthread_counter_destroy(dp->n_lost);
+
     dp_netdev_purge_queues(dp);
     seq_destroy(dp->queue_seq);
+    ovs_mutex_destroy(&dp->queue_mutex);
+
     classifier_destroy(&dp->cls);
     hmap_destroy(&dp->flow_table);
     seq_destroy(dp->port_seq);
@@ -1184,9 +1197,9 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
 }
 
 static struct dp_netdev_queue *
-find_nonempty_queue(struct dpif *dpif)
+find_nonempty_queue(struct dp_netdev *dp)
+    OVS_REQUIRES(dp->queue_mutex)
 {
-    struct dp_netdev *dp = get_dp_netdev(dpif);
     int i;
 
     for (i = 0; i < N_QUEUES; i++) {
@@ -1202,11 +1215,12 @@ static int
 dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
                  struct ofpbuf *buf)
 {
+    struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_queue *q;
     int error;
 
-    ovs_mutex_lock(&dp_netdev_mutex);
-    q = find_nonempty_queue(dpif);
+    ovs_mutex_lock(&dp->queue_mutex);
+    q = find_nonempty_queue(dp);
     if (q) {
         struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
 
@@ -1219,7 +1233,7 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall,
     } else {
         error = EAGAIN;
     }
-    ovs_mutex_unlock(&dp_netdev_mutex);
+    ovs_mutex_unlock(&dp->queue_mutex);
 
     return error;
 }
@@ -1230,23 +1244,22 @@ dpif_netdev_recv_wait(struct dpif *dpif)
     struct dp_netdev *dp = get_dp_netdev(dpif);
     uint64_t seq;
 
-    ovs_mutex_lock(&dp_netdev_mutex);
+    ovs_mutex_lock(&dp->queue_mutex);
     seq = seq_read(dp->queue_seq);
-    if (find_nonempty_queue(dpif)) {
+    if (find_nonempty_queue(dp)) {
         poll_immediate_wake();
     } else {
         seq_wait(dp->queue_seq, seq);
     }
-    ovs_mutex_unlock(&dp_netdev_mutex);
+    ovs_mutex_unlock(&dp->queue_mutex);
 }
 
 static void
 dpif_netdev_recv_purge(struct dpif *dpif)
 {
     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
-    ovs_mutex_lock(&dp_netdev_mutex);
+
     dp_netdev_purge_queues(dpif_netdev->dp);
-    ovs_mutex_unlock(&dp_netdev_mutex);
 }
 \f
 /* Creates and returns a new 'struct dp_netdev_actions', with a reference count
@@ -1408,8 +1421,12 @@ static int
 dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
                            int queue_no, const struct flow *flow,
                            const struct nlattr *userdata)
+    OVS_EXCLUDED(dp->queue_mutex)
 {
     struct dp_netdev_queue *q = &dp->queues[queue_no];
+    int error;
+
+    ovs_mutex_lock(&dp->queue_mutex);
     if (q->head - q->tail < MAX_QUEUE_LEN) {
         struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
         struct dpif_upcall *upcall = &u->upcall;
@@ -1443,11 +1460,14 @@ dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
 
         seq_change(dp->queue_seq);
 
-        return 0;
+        error = 0;
     } else {
         ovsthread_counter_inc(dp->n_lost, 1);
-        return ENOBUFS;
+        error = ENOBUFS;
     }
+    ovs_mutex_unlock(&dp->queue_mutex);
+
+    return error;
 }
 
 struct dp_netdev_execute_aux {