datapath: Fix race against workqueue in dp_dev and simplify code.
authorBen Pfaff <blp@nicira.com>
Wed, 8 Jul 2009 19:23:32 +0000 (12:23 -0700)
committerBen Pfaff <blp@nicira.com>
Wed, 8 Jul 2009 21:13:15 +0000 (14:13 -0700)
The dp_dev_destroy() function failed to cancel the xmit_queue work, which
allowed it to run after the device had been destroyed, accessing freed
memory.  However, simply canceling the work with cancel_work_sync() would
be insufficient, since other packets could get queued while the work
function was running.  Stopping the queue with netif_tx_disable() doesn't
help, because the final action in dp_dev_do_xmit() is to re-enable the TX
queue.

This issue led me to re-examine why the dp_dev needs to use a work_struct
at all.  This was implemented in commit 71f13ed0b "Send of0 packets from
workqueue, to avoid recursive locking of ofN device" due to a complaint
from lockdep about recursive locking.

However, there's no actual reason that we need any locking around
dp_dev_xmit().  Until now, it has accepted the standard locking provided
by the network stack.  But looking at the other software devices (veth,
loopback), those use NETIF_F_LLTX, which disables this locking, and
presumably do so for this very reason.  In fact, the lwn article at
http://lwn.net/Articles/121566/ hints that NETIF_F_LLTX, which is otherwise
discouraged in the kernel, is acceptable for "certain types of software
device."

So this commit switches to using NETIF_F_LLTX for dp_dev and gets rid
of the work_struct.

In the process, I noticed that veth and loopback also take advantage of
a network device destruction "hook" using the net_device "destructor"
member.  Using this we can automatically get called on network device
destruction at the point where rtnl_unlock() is called.  This allows us
to stop stringing the dp_devs that are being destroyed onto a list so
that we can free them, and thus simplifies the code along all the paths
that call dp_dev_destroy().

This commit gets rid of a call to synchronize_rcu() (disguised as a call
to synchronize_net(), which is a macro that expands to synchronize_rcu()),
so it probably speeds up deleting ports, too.

datapath/datapath.c
datapath/datapath.h
datapath/dp_dev.c
datapath/dp_dev.h
datapath/dp_notify.c

index 7f496c5..fb23977 100644 (file)
@@ -255,7 +255,7 @@ static int create_dp(int dp_idx, const char __user *devnamep)
        return 0;
 
 err_destroy_local_port:
-       dp_del_port(dp->ports[ODPP_LOCAL], NULL);
+       dp_del_port(dp->ports[ODPP_LOCAL]);
 err_destroy_table:
        dp_table_destroy(dp->table, 0);
 err_destroy_dp_dev:
@@ -271,21 +271,21 @@ err:
        return err;
 }
 
-static void do_destroy_dp(struct datapath *dp, struct list_head *dp_devs)
+static void do_destroy_dp(struct datapath *dp)
 {
        struct net_bridge_port *p, *n;
        int i;
 
        list_for_each_entry_safe (p, n, &dp->port_list, node)
                if (p->port_no != ODPP_LOCAL)
-                       dp_del_port(p, dp_devs);
+                       dp_del_port(p);
 
        if (dp_del_dp_hook)
                dp_del_dp_hook(dp);
 
        rcu_assign_pointer(dps[dp->dp_idx], NULL);
 
-       dp_del_port(dp->ports[ODPP_LOCAL], dp_devs);
+       dp_del_port(dp->ports[ODPP_LOCAL]);
 
        dp_table_destroy(dp->table, 1);
 
@@ -300,9 +300,7 @@ static void do_destroy_dp(struct datapath *dp, struct list_head *dp_devs)
 
 static int destroy_dp(int dp_idx)
 {
-       struct dp_dev *dp_dev, *next;
        struct datapath *dp;
-       LIST_HEAD(dp_devs);
        int err;
 
        rtnl_lock();
@@ -312,14 +310,12 @@ static int destroy_dp(int dp_idx)
        if (!dp)
                goto err_unlock;
 
-       do_destroy_dp(dp, &dp_devs);
+       do_destroy_dp(dp);
        err = 0;
 
 err_unlock:
        mutex_unlock(&dp_mutex);
        rtnl_unlock();
-       list_for_each_entry_safe (dp_dev, next, &dp_devs, list)
-               free_netdev(dp_dev->dev);
        return err;
 }
 
@@ -420,7 +416,7 @@ out:
        return err;
 }
 
-int dp_del_port(struct net_bridge_port *p, struct list_head *dp_devs)
+int dp_del_port(struct net_bridge_port *p)
 {
        ASSERT_RTNL();
 
@@ -450,10 +446,6 @@ int dp_del_port(struct net_bridge_port *p, struct list_head *dp_devs)
 
        if (is_dp_dev(p->dev)) {
                dp_dev_destroy(p->dev);
-               if (dp_devs) {
-                       struct dp_dev *dp_dev = dp_dev_priv(p->dev);
-                       list_add(&dp_dev->list, dp_devs);
-               }
        }
        if (p->port_no != ODPP_LOCAL && dp_del_if_hook) {
                dp_del_if_hook(p);
@@ -467,7 +459,6 @@ int dp_del_port(struct net_bridge_port *p, struct list_head *dp_devs)
 
 static int del_port(int dp_idx, int port_no)
 {
-       struct dp_dev *dp_dev, *next;
        struct net_bridge_port *p;
        struct datapath *dp;
        LIST_HEAD(dp_devs);
@@ -488,15 +479,13 @@ static int del_port(int dp_idx, int port_no)
        if (!p)
                goto out_unlock_dp;
 
-       err = dp_del_port(p, &dp_devs);
+       err = dp_del_port(p);
 
 out_unlock_dp:
        mutex_unlock(&dp->mutex);
 out_unlock_rtnl:
        rtnl_unlock();
 out:
-       list_for_each_entry_safe (dp_dev, next, &dp_devs, list)
-               free_netdev(dp_dev->dev);
        return err;
 }
 
index 13b6641..455580f 100644 (file)
@@ -120,7 +120,7 @@ int dp_table_foreach(struct dp_table *table,
                     void *aux);
 
 void dp_process_received_packet(struct sk_buff *, struct net_bridge_port *);
-int dp_del_port(struct net_bridge_port *, struct list_head *);
+int dp_del_port(struct net_bridge_port *);
 int dp_output_control(struct datapath *, struct sk_buff *, int, u32 arg);
 
 struct datapath *get_dp(int dp_idx);
index 34a102a..0284f96 100644 (file)
@@ -56,6 +56,8 @@ static int dp_dev_mac_addr(struct net_device *dev, void *p)
        return 0;
 }
 
+/* Not reentrant (because it is called with BHs disabled), but may be called
+ * simultaneously on different CPUs. */
 static int dp_dev_xmit(struct sk_buff *skb, struct net_device *netdev)
 {
        struct dp_dev *dp_dev = dp_dev_priv(netdev);
@@ -66,10 +68,7 @@ static int dp_dev_xmit(struct sk_buff *skb, struct net_device *netdev)
         * harder to predict when. */
        skb_orphan(skb);
 
-       /* We are going to modify 'skb', by sticking it on &dp_dev->xmit_queue,
-        * so we need to have our own clone.  (At any rate, fwd_port_input()
-        * will need its own clone, so there's no benefit to queuing any other
-        * way.) */
+       /* dp_process_received_packet() needs its own clone. */
        skb = skb_share_check(skb, GFP_ATOMIC);
        if (!skb)
                return 0;
@@ -77,37 +76,14 @@ static int dp_dev_xmit(struct sk_buff *skb, struct net_device *netdev)
        dp_dev->stats.tx_packets++;
        dp_dev->stats.tx_bytes += skb->len;
 
-       if (skb_queue_len(&dp_dev->xmit_queue) >= netdev->tx_queue_len) {
-               /* Queue overflow.  Stop transmitter. */
-               netif_stop_queue(netdev);
-
-               /* We won't see all dropped packets individually, so overrun
-                * error is appropriate. */
-               dp_dev->stats.tx_fifo_errors++;
-       }
-       skb_queue_tail(&dp_dev->xmit_queue, skb);
-       netdev->trans_start = jiffies;
-
-       schedule_work(&dp_dev->xmit_work);
+       skb_reset_mac_header(skb);
+       rcu_read_lock_bh();
+       dp_process_received_packet(skb, dp_dev->dp->ports[dp_dev->port_no]);
+       rcu_read_unlock_bh();
 
        return 0;
 }
 
-static void dp_dev_do_xmit(struct work_struct *work)
-{
-       struct dp_dev *dp_dev = container_of(work, struct dp_dev, xmit_work);
-       struct datapath *dp = dp_dev->dp;
-       struct sk_buff *skb;
-
-       while ((skb = skb_dequeue(&dp_dev->xmit_queue)) != NULL) {
-               skb_reset_mac_header(skb);
-               rcu_read_lock_bh();
-               dp_process_received_packet(skb, dp->ports[dp_dev->port_no]);
-               rcu_read_unlock_bh();
-       }
-       netif_wake_queue(dp_dev->dev);
-}
-
 static int dp_dev_open(struct net_device *netdev)
 {
        netif_start_queue(netdev);
@@ -146,10 +122,12 @@ do_setup(struct net_device *netdev)
        netdev->open = dp_dev_open;
        SET_ETHTOOL_OPS(netdev, &dp_ethtool_ops);
        netdev->stop = dp_dev_stop;
-       netdev->tx_queue_len = 100;
+       netdev->tx_queue_len = 0;
        netdev->set_mac_address = dp_dev_mac_addr;
+       netdev->destructor = free_netdev;
 
        netdev->flags = IFF_BROADCAST | IFF_MULTICAST;
+       netdev->features = NETIF_F_LLTX; /* XXX other features? */
 
        random_ether_addr(netdev->dev_addr);
 
@@ -195,19 +173,12 @@ struct net_device *dp_dev_create(struct datapath *dp, const char *dp_name, int p
        dp_dev->dp = dp;
        dp_dev->port_no = port_no;
        dp_dev->dev = netdev;
-       skb_queue_head_init(&dp_dev->xmit_queue);
-       INIT_WORK(&dp_dev->xmit_work, dp_dev_do_xmit);
        return netdev;
 }
 
 /* Called with RTNL lock and dp_mutex.*/
 void dp_dev_destroy(struct net_device *netdev)
 {
-       struct dp_dev *dp_dev = dp_dev_priv(netdev);
-
-       netif_tx_disable(netdev);
-       synchronize_net();
-       skb_queue_purge(&dp_dev->xmit_queue);
        unregister_netdevice(netdev);
 }
 
index 1b17d8f..015c71c 100644 (file)
@@ -9,16 +9,14 @@
 #ifndef DP_DEV_H
 #define DP_DEV_H 1
 
+#include <linux/percpu.h>
+
 struct dp_dev {
        struct datapath *dp;
        int port_no;
 
        struct net_device *dev;
        struct net_device_stats stats;
-       struct sk_buff_head xmit_queue;
-       struct work_struct xmit_work;
-
-       struct list_head list;
 };
 
 static inline struct dp_dev *dp_dev_priv(struct net_device *netdev)
index 6203470..5b8cf17 100644 (file)
@@ -21,7 +21,7 @@ static int dp_device_event(struct notifier_block *unused, unsigned long event,
        if (event == NETDEV_UNREGISTER && p) {
                struct datapath *dp = p->dp;
                mutex_lock(&dp->mutex);
-               dp_del_port(p, NULL);
+               dp_del_port(p);
                mutex_unlock(&dp->mutex);
        }
        return NOTIFY_DONE;