From: Ben Pfaff Date: Fri, 27 Dec 2013 17:42:51 +0000 (-0800) Subject: dpif-netdev: Introduce new mutex to protect queues. X-Git-Tag: sliver-openvswitch-2.1.90-1~10^2~33 X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=f5126b57279f6c5664c0c646588df26916b4af89;p=sliver-openvswitch.git dpif-netdev: Introduce new mutex to protect queues. This is a first step in making thread safety more granular in dpif-netdev, to allow for multithreaded forwarding. Signed-off-by: Ben Pfaff --- diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 5292818b4..285967d55 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -83,8 +83,9 @@ struct dp_netdev_upcall { }; struct dp_netdev_queue { - struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN]; - unsigned int head, tail; + struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED; + unsigned int head OVS_GUARDED; + unsigned int tail OVS_GUARDED; }; /* Datapath based on the network device interface from netdev.h. */ @@ -94,9 +95,12 @@ struct dp_netdev { struct ovs_refcount ref_cnt; atomic_flag destroyed; - struct dp_netdev_queue queues[N_QUEUES]; struct classifier cls; /* Classifier. */ struct hmap flow_table; /* Flow table. */ + + /* Queues. */ + struct ovs_mutex queue_mutex; + struct dp_netdev_queue queues[N_QUEUES]; struct seq *queue_seq; /* Incremented whenever a packet is queued. */ /* Statistics. */ @@ -190,9 +194,10 @@ static int do_add_port(struct dp_netdev *, const char *devname, static int do_del_port(struct dp_netdev *, odp_port_t port_no); static int dpif_netdev_open(const struct dpif_class *, const char *name, bool create, struct dpif **); -static int dp_netdev_output_userspace(struct dp_netdev *, struct ofpbuf *, +static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *, int queue_no, const struct flow *, - const struct nlattr *userdata); + const struct nlattr *userdata) + OVS_EXCLUDED(dp->queue_mutex); static void dp_netdev_execute_actions(struct dp_netdev *, const struct flow *, struct ofpbuf *, struct pkt_metadata *, const struct nlattr *actions, @@ -311,9 +316,12 @@ create_dp_netdev(const char *name, const struct dpif_class *class, dp->name = xstrdup(name); ovs_refcount_init(&dp->ref_cnt); atomic_flag_init(&dp->destroyed); + ovs_mutex_init(&dp->queue_mutex); + ovs_mutex_lock(&dp->queue_mutex); for (i = 0; i < N_QUEUES; i++) { dp->queues[i].head = dp->queues[i].tail = 0; } + ovs_mutex_unlock(&dp->queue_mutex); dp->queue_seq = seq_create(); classifier_init(&dp->cls, NULL); hmap_init(&dp->flow_table); @@ -366,6 +374,7 @@ dp_netdev_purge_queues(struct dp_netdev *dp) { int i; + ovs_mutex_lock(&dp->queue_mutex); for (i = 0; i < N_QUEUES; i++) { struct dp_netdev_queue *q = &dp->queues[i]; @@ -375,6 +384,7 @@ dp_netdev_purge_queues(struct dp_netdev *dp) ofpbuf_uninit(&u->buf); } } + ovs_mutex_unlock(&dp->queue_mutex); } static void @@ -389,8 +399,11 @@ dp_netdev_free(struct dp_netdev *dp) ovsthread_counter_destroy(dp->n_hit); ovsthread_counter_destroy(dp->n_missed); ovsthread_counter_destroy(dp->n_lost); + dp_netdev_purge_queues(dp); seq_destroy(dp->queue_seq); + ovs_mutex_destroy(&dp->queue_mutex); + classifier_destroy(&dp->cls); hmap_destroy(&dp->flow_table); seq_destroy(dp->port_seq); @@ -1184,9 +1197,9 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED, } static struct dp_netdev_queue * -find_nonempty_queue(struct dpif *dpif) +find_nonempty_queue(struct dp_netdev *dp) + OVS_REQUIRES(dp->queue_mutex) { - struct dp_netdev *dp = get_dp_netdev(dpif); int i; for (i = 0; i < N_QUEUES; i++) { @@ -1202,11 +1215,12 @@ static int dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, struct ofpbuf *buf) { + struct dp_netdev *dp = get_dp_netdev(dpif); struct dp_netdev_queue *q; int error; - ovs_mutex_lock(&dp_netdev_mutex); - q = find_nonempty_queue(dpif); + ovs_mutex_lock(&dp->queue_mutex); + q = find_nonempty_queue(dp); if (q) { struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK]; @@ -1219,7 +1233,7 @@ dpif_netdev_recv(struct dpif *dpif, struct dpif_upcall *upcall, } else { error = EAGAIN; } - ovs_mutex_unlock(&dp_netdev_mutex); + ovs_mutex_unlock(&dp->queue_mutex); return error; } @@ -1230,23 +1244,22 @@ dpif_netdev_recv_wait(struct dpif *dpif) struct dp_netdev *dp = get_dp_netdev(dpif); uint64_t seq; - ovs_mutex_lock(&dp_netdev_mutex); + ovs_mutex_lock(&dp->queue_mutex); seq = seq_read(dp->queue_seq); - if (find_nonempty_queue(dpif)) { + if (find_nonempty_queue(dp)) { poll_immediate_wake(); } else { seq_wait(dp->queue_seq, seq); } - ovs_mutex_unlock(&dp_netdev_mutex); + ovs_mutex_unlock(&dp->queue_mutex); } static void dpif_netdev_recv_purge(struct dpif *dpif) { struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); - ovs_mutex_lock(&dp_netdev_mutex); + dp_netdev_purge_queues(dpif_netdev->dp); - ovs_mutex_unlock(&dp_netdev_mutex); } /* Creates and returns a new 'struct dp_netdev_actions', with a reference count @@ -1408,8 +1421,12 @@ static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet, int queue_no, const struct flow *flow, const struct nlattr *userdata) + OVS_EXCLUDED(dp->queue_mutex) { struct dp_netdev_queue *q = &dp->queues[queue_no]; + int error; + + ovs_mutex_lock(&dp->queue_mutex); if (q->head - q->tail < MAX_QUEUE_LEN) { struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK]; struct dpif_upcall *upcall = &u->upcall; @@ -1443,11 +1460,14 @@ dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet, seq_change(dp->queue_seq); - return 0; + error = 0; } else { ovsthread_counter_inc(dp->n_lost, 1); - return ENOBUFS; + error = ENOBUFS; } + ovs_mutex_unlock(&dp->queue_mutex); + + return error; } struct dp_netdev_execute_aux {