vserver 1.9.5.x5
[linux-2.6.git] / net / sunrpc / sched.c
index aa2205a..be26e4c 100644 (file)
 #include <linux/smp.h>
 #include <linux/smp_lock.h>
 #include <linux/spinlock.h>
-#include <linux/suspend.h>
 
 #include <linux/sunrpc/clnt.h>
 #include <linux/sunrpc/xprt.h>
 
 #ifdef RPC_DEBUG
 #define RPCDBG_FACILITY                RPCDBG_SCHED
+#define RPC_TASK_MAGIC_ID      0xf00baa
 static int                     rpc_task_id;
 #endif
 
@@ -41,13 +41,9 @@ static mempool_t     *rpc_buffer_mempool;
 
 static void                    __rpc_default_timer(struct rpc_task *task);
 static void                    rpciod_killall(void);
+static void                    rpc_free(struct rpc_task *task);
 
-/*
- * When an asynchronous RPC task is activated within a bottom half
- * handler, or while executing another RPC task, it is put on
- * schedq, and rpciod is woken up.
- */
-static RPC_WAITQ(schedq, "schedq");
+static void                    rpc_async_schedule(void *);
 
 /*
  * RPC tasks that create another task (e.g. for contacting the portmapper)
@@ -68,26 +64,18 @@ static LIST_HEAD(all_tasks);
 /*
  * rpciod-related stuff
  */
-static DECLARE_WAIT_QUEUE_HEAD(rpciod_idle);
-static DECLARE_COMPLETION(rpciod_killer);
 static DECLARE_MUTEX(rpciod_sema);
 static unsigned int            rpciod_users;
-static pid_t                   rpciod_pid;
-static int                     rpc_inhibit;
+static struct workqueue_struct *rpciod_workqueue;
 
-/*
- * Spinlock for wait queues. Access to the latter also has to be
- * interrupt-safe in order to allow timers to wake up sleeping tasks.
- */
-static spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
 /*
  * Spinlock for other critical sections of code.
  */
-static spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED;
+static DEFINE_SPINLOCK(rpc_sched_lock);
 
 /*
  * Disable the timer for a given RPC task. Should be called with
- * rpc_queue_lock and bh_disabled in order to avoid races within
+ * queue->lock and bh_disabled in order to avoid races within
  * rpc_run_timer().
  */
 static inline void
@@ -105,19 +93,19 @@ __rpc_disable_timer(struct rpc_task *task)
  * without calling del_timer_sync(). The latter could cause a
  * deadlock if called while we're holding spinlocks...
  */
-static void
-rpc_run_timer(struct rpc_task *task)
+static void rpc_run_timer(struct rpc_task *task)
 {
        void (*callback)(struct rpc_task *);
 
-       spin_lock_bh(&rpc_queue_lock);
        callback = task->tk_timeout_fn;
        task->tk_timeout_fn = NULL;
-       spin_unlock_bh(&rpc_queue_lock);
-       if (callback) {
+       if (callback && RPC_IS_QUEUED(task)) {
                dprintk("RPC: %4d running timer\n", task->tk_pid);
                callback(task);
        }
+       smp_mb__before_clear_bit();
+       clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
+       smp_mb__after_clear_bit();
 }
 
 /*
@@ -136,29 +124,21 @@ __rpc_add_timer(struct rpc_task *task, rpc_action timer)
                task->tk_timeout_fn = timer;
        else
                task->tk_timeout_fn = __rpc_default_timer;
+       set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
        mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
 }
 
-/*
- * Set up a timer for an already sleeping task.
- */
-void rpc_add_timer(struct rpc_task *task, rpc_action timer)
-{
-       spin_lock_bh(&rpc_queue_lock);
-       if (!RPC_IS_RUNNING(task))
-               __rpc_add_timer(task, timer);
-       spin_unlock_bh(&rpc_queue_lock);
-}
-
 /*
  * Delete any timer for the current task. Because we use del_timer_sync(),
- * this function should never be called while holding rpc_queue_lock.
+ * this function should never be called while holding queue->lock.
  */
 static inline void
 rpc_delete_timer(struct rpc_task *task)
 {
-       if (del_timer_sync(&task->tk_timer))
+       if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) {
+               del_singleshot_timer_sync(&task->tk_timer);
                dprintk("RPC: %4d deleting timer\n", task->tk_pid);
+       }
 }
 
 /*
@@ -169,16 +149,17 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct r
        struct list_head *q;
        struct rpc_task *t;
 
+       INIT_LIST_HEAD(&task->u.tk_wait.links);
        q = &queue->tasks[task->tk_priority];
        if (unlikely(task->tk_priority > queue->maxpriority))
                q = &queue->tasks[queue->maxpriority];
-       list_for_each_entry(t, q, tk_list) {
+       list_for_each_entry(t, q, u.tk_wait.list) {
                if (t->tk_cookie == task->tk_cookie) {
-                       list_add_tail(&task->tk_list, &t->tk_links);
+                       list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
                        return;
                }
        }
-       list_add_tail(&task->tk_list, q);
+       list_add_tail(&task->u.tk_wait.list, q);
 }
 
 /*
@@ -189,37 +170,21 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct r
  * improve overall performance.
  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
  */
-static int __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
-       if (task->tk_rpcwait == queue)
-               return 0;
+       BUG_ON (RPC_IS_QUEUED(task));
 
-       if (task->tk_rpcwait) {
-               printk(KERN_WARNING "RPC: doubly enqueued task!\n");
-               return -EWOULDBLOCK;
-       }
        if (RPC_IS_PRIORITY(queue))
                __rpc_add_wait_queue_priority(queue, task);
        else if (RPC_IS_SWAPPER(task))
-               list_add(&task->tk_list, &queue->tasks[0]);
+               list_add(&task->u.tk_wait.list, &queue->tasks[0]);
        else
-               list_add_tail(&task->tk_list, &queue->tasks[0]);
-       task->tk_rpcwait = queue;
+               list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
+       task->u.tk_wait.rpc_waitq = queue;
+       rpc_set_queued(task);
 
        dprintk("RPC: %4d added to queue %p \"%s\"\n",
                                task->tk_pid, queue, rpc_qname(queue));
-
-       return 0;
-}
-
-int rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
-{
-       int             result;
-
-       spin_lock_bh(&rpc_queue_lock);
-       result = __rpc_add_wait_queue(q, task);
-       spin_unlock_bh(&rpc_queue_lock);
-       return result;
 }
 
 /*
@@ -229,12 +194,12 @@ static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
 {
        struct rpc_task *t;
 
-       if (!list_empty(&task->tk_links)) {
-               t = list_entry(task->tk_links.next, struct rpc_task, tk_list);
-               list_move(&t->tk_list, &task->tk_list);
-               list_splice_init(&task->tk_links, &t->tk_links);
+       if (!list_empty(&task->u.tk_wait.links)) {
+               t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
+               list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
+               list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
        }
-       list_del(&task->tk_list);
+       list_del(&task->u.tk_wait.list);
 }
 
 /*
@@ -243,31 +208,17 @@ static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
  */
 static void __rpc_remove_wait_queue(struct rpc_task *task)
 {
-       struct rpc_wait_queue *queue = task->tk_rpcwait;
-
-       if (!queue)
-               return;
+       struct rpc_wait_queue *queue;
+       queue = task->u.tk_wait.rpc_waitq;
 
        if (RPC_IS_PRIORITY(queue))
                __rpc_remove_wait_queue_priority(task);
        else
-               list_del(&task->tk_list);
-       task->tk_rpcwait = NULL;
-
+               list_del(&task->u.tk_wait.list);
        dprintk("RPC: %4d removed from queue %p \"%s\"\n",
                                task->tk_pid, queue, rpc_qname(queue));
 }
 
-void
-rpc_remove_wait_queue(struct rpc_task *task)
-{
-       if (!task->tk_rpcwait)
-               return;
-       spin_lock_bh(&rpc_queue_lock);
-       __rpc_remove_wait_queue(task);
-       spin_unlock_bh(&rpc_queue_lock);
-}
-
 static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
 {
        queue->priority = priority;
@@ -290,6 +241,7 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c
 {
        int i;
 
+       spin_lock_init(&queue->lock);
        for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
                INIT_LIST_HEAD(&queue->tasks[i]);
        queue->maxpriority = maxprio;
@@ -316,34 +268,31 @@ EXPORT_SYMBOL(rpc_init_wait_queue);
  * Note: If the task is ASYNC, this must be called with 
  * the spinlock held to protect the wait queue operation.
  */
-static inline void
-rpc_make_runnable(struct rpc_task *task)
+static void rpc_make_runnable(struct rpc_task *task)
 {
-       if (task->tk_timeout_fn) {
-               printk(KERN_ERR "RPC: task w/ running timer in rpc_make_runnable!!\n");
+       int do_ret;
+
+       BUG_ON(task->tk_timeout_fn);
+       do_ret = rpc_test_and_set_running(task);
+       rpc_clear_queued(task);
+       if (do_ret)
                return;
-       }
-       rpc_set_running(task);
        if (RPC_IS_ASYNC(task)) {
-               if (RPC_IS_SLEEPING(task)) {
-                       int status;
-                       status = __rpc_add_wait_queue(&schedq, task);
-                       if (status < 0) {
-                               printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
-                               task->tk_status = status;
-                               return;
-                       }
-                       rpc_clear_sleeping(task);
-                       wake_up(&rpciod_idle);
+               int status;
+
+               INIT_WORK(&task->u.tk_work, rpc_async_schedule, (void *)task);
+               status = queue_work(task->tk_workqueue, &task->u.tk_work);
+               if (status < 0) {
+                       printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
+                       task->tk_status = status;
+                       return;
                }
-       } else {
-               rpc_clear_sleeping(task);
-               wake_up(&task->tk_wait);
-       }
+       } else
+               wake_up(&task->u.tk_wait.waitq);
 }
 
 /*
- * Place a newly initialized task on the schedq.
+ * Place a newly initialized task on the workqueue.
  */
 static inline void
 rpc_schedule_run(struct rpc_task *task)
@@ -352,33 +301,18 @@ rpc_schedule_run(struct rpc_task *task)
        if (RPC_IS_ACTIVATED(task))
                return;
        task->tk_active = 1;
-       rpc_set_sleeping(task);
        rpc_make_runnable(task);
 }
 
-/*
- *     For other people who may need to wake the I/O daemon
- *     but should (for now) know nothing about its innards
- */
-void rpciod_wake_up(void)
-{
-       if(rpciod_pid==0)
-               printk(KERN_ERR "rpciod: wot no daemon?\n");
-       wake_up(&rpciod_idle);
-}
-
 /*
  * Prepare for sleeping on a wait queue.
  * By always appending tasks to the list we ensure FIFO behavior.
  * NB: An RPC task will only receive interrupt-driven events as long
  * as it's on a wait queue.
  */
-static void
-__rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
+static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
                        rpc_action action, rpc_action timer)
 {
-       int status;
-
        dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
                                rpc_qname(q), jiffies);
 
@@ -388,75 +322,66 @@ __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
        }
 
        /* Mark the task as being activated if so needed */
-       if (!RPC_IS_ACTIVATED(task)) {
+       if (!RPC_IS_ACTIVATED(task))
                task->tk_active = 1;
-               rpc_set_sleeping(task);
-       }
 
-       status = __rpc_add_wait_queue(q, task);
-       if (status) {
-               printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
-               task->tk_status = status;
-       } else {
-               rpc_clear_running(task);
-               if (task->tk_callback) {
-                       dprintk(KERN_ERR "RPC: %4d overwrites an active callback\n", task->tk_pid);
-                       BUG();
-               }
-               task->tk_callback = action;
-               __rpc_add_timer(task, timer);
-       }
+       __rpc_add_wait_queue(q, task);
+
+       BUG_ON(task->tk_callback != NULL);
+       task->tk_callback = action;
+       __rpc_add_timer(task, timer);
 }
 
-void
-rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
+void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
                                rpc_action action, rpc_action timer)
 {
        /*
         * Protect the queue operations.
         */
-       spin_lock_bh(&rpc_queue_lock);
+       spin_lock_bh(&q->lock);
        __rpc_sleep_on(q, task, action, timer);
-       spin_unlock_bh(&rpc_queue_lock);
+       spin_unlock_bh(&q->lock);
 }
 
 /**
- * __rpc_wake_up_task - wake up a single rpc_task
+ * __rpc_do_wake_up_task - wake up a single rpc_task
  * @task: task to be woken up
  *
- * Caller must hold rpc_queue_lock
+ * Caller must hold queue->lock, and have cleared the task queued flag.
  */
-static void
-__rpc_wake_up_task(struct rpc_task *task)
+static void __rpc_do_wake_up_task(struct rpc_task *task)
 {
-       dprintk("RPC: %4d __rpc_wake_up_task (now %ld inh %d)\n",
-                                       task->tk_pid, jiffies, rpc_inhibit);
+       dprintk("RPC: %4d __rpc_wake_up_task (now %ld)\n", task->tk_pid, jiffies);
 
 #ifdef RPC_DEBUG
-       if (task->tk_magic != 0xf00baa) {
-               printk(KERN_ERR "RPC: attempt to wake up non-existing task!\n");
-               rpc_debug = ~0;
-               rpc_show_tasks();
-               return;
-       }
+       BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
 #endif
        /* Has the task been executed yet? If not, we cannot wake it up! */
        if (!RPC_IS_ACTIVATED(task)) {
                printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
                return;
        }
-       if (RPC_IS_RUNNING(task))
-               return;
 
        __rpc_disable_timer(task);
-       if (task->tk_rpcwait != &schedq)
-               __rpc_remove_wait_queue(task);
+       __rpc_remove_wait_queue(task);
 
        rpc_make_runnable(task);
 
        dprintk("RPC:      __rpc_wake_up_task done\n");
 }
 
+/*
+ * Wake up the specified task
+ */
+static void __rpc_wake_up_task(struct rpc_task *task)
+{
+       if (rpc_start_wakeup(task)) {
+               if (RPC_IS_QUEUED(task))
+                       __rpc_do_wake_up_task(task);
+               rpc_finish_wakeup(task);
+       }
+}
+
 /*
  * Default timeout handler if none specified by user
  */
@@ -471,14 +396,18 @@ __rpc_default_timer(struct rpc_task *task)
 /*
  * Wake up the specified task
  */
-void
-rpc_wake_up_task(struct rpc_task *task)
+void rpc_wake_up_task(struct rpc_task *task)
 {
-       if (RPC_IS_RUNNING(task))
-               return;
-       spin_lock_bh(&rpc_queue_lock);
-       __rpc_wake_up_task(task);
-       spin_unlock_bh(&rpc_queue_lock);
+       if (rpc_start_wakeup(task)) {
+               if (RPC_IS_QUEUED(task)) {
+                       struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq;
+
+                       spin_lock_bh(&queue->lock);
+                       __rpc_do_wake_up_task(task);
+                       spin_unlock_bh(&queue->lock);
+               }
+               rpc_finish_wakeup(task);
+       }
 }
 
 /*
@@ -494,11 +423,11 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu
         */
        q = &queue->tasks[queue->priority];
        if (!list_empty(q)) {
-               task = list_entry(q->next, struct rpc_task, tk_list);
+               task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
                if (queue->cookie == task->tk_cookie) {
                        if (--queue->nr)
                                goto out;
-                       list_move_tail(&task->tk_list, q);
+                       list_move_tail(&task->u.tk_wait.list, q);
                }
                /*
                 * Check if we need to switch queues.
@@ -516,7 +445,7 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu
                else
                        q = q - 1;
                if (!list_empty(q)) {
-                       task = list_entry(q->next, struct rpc_task, tk_list);
+                       task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
                        goto new_queue;
                }
        } while (q != &queue->tasks[queue->priority]);
@@ -541,14 +470,14 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
        struct rpc_task *task = NULL;
 
        dprintk("RPC:      wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
-       spin_lock_bh(&rpc_queue_lock);
+       spin_lock_bh(&queue->lock);
        if (RPC_IS_PRIORITY(queue))
                task = __rpc_wake_up_next_priority(queue);
        else {
                task_for_first(task, &queue->tasks[0])
                        __rpc_wake_up_task(task);
        }
-       spin_unlock_bh(&rpc_queue_lock);
+       spin_unlock_bh(&queue->lock);
 
        return task;
 }
@@ -557,25 +486,25 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
  * rpc_wake_up - wake up all rpc_tasks
  * @queue: rpc_wait_queue on which the tasks are sleeping
  *
- * Grabs rpc_queue_lock
+ * Grabs queue->lock
  */
 void rpc_wake_up(struct rpc_wait_queue *queue)
 {
        struct rpc_task *task;
 
        struct list_head *head;
-       spin_lock_bh(&rpc_queue_lock);
+       spin_lock_bh(&queue->lock);
        head = &queue->tasks[queue->maxpriority];
        for (;;) {
                while (!list_empty(head)) {
-                       task = list_entry(head->next, struct rpc_task, tk_list);
+                       task = list_entry(head->next, struct rpc_task, u.tk_wait.list);
                        __rpc_wake_up_task(task);
                }
                if (head == &queue->tasks[0])
                        break;
                head--;
        }
-       spin_unlock_bh(&rpc_queue_lock);
+       spin_unlock_bh(&queue->lock);
 }
 
 /**
@@ -583,18 +512,18 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
  * @queue: rpc_wait_queue on which the tasks are sleeping
  * @status: status value to set
  *
- * Grabs rpc_queue_lock
+ * Grabs queue->lock
  */
 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
 {
        struct list_head *head;
        struct rpc_task *task;
 
-       spin_lock_bh(&rpc_queue_lock);
+       spin_lock_bh(&queue->lock);
        head = &queue->tasks[queue->maxpriority];
        for (;;) {
                while (!list_empty(head)) {
-                       task = list_entry(head->next, struct rpc_task, tk_list);
+                       task = list_entry(head->next, struct rpc_task, u.tk_wait.list);
                        task->tk_status = status;
                        __rpc_wake_up_task(task);
                }
@@ -602,7 +531,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
                        break;
                head--;
        }
-       spin_unlock_bh(&rpc_queue_lock);
+       spin_unlock_bh(&queue->lock);
 }
 
 /*
@@ -626,21 +555,22 @@ __rpc_atrun(struct rpc_task *task)
 /*
  * This is the RPC `scheduler' (or rather, the finite state machine).
  */
-static int
-__rpc_execute(struct rpc_task *task)
+static int __rpc_execute(struct rpc_task *task)
 {
        int             status = 0;
 
        dprintk("RPC: %4d rpc_execute flgs %x\n",
                                task->tk_pid, task->tk_flags);
 
-       if (!RPC_IS_RUNNING(task)) {
-               printk(KERN_WARNING "RPC: rpc_execute called for sleeping task!!\n");
-               return 0;
-       }
+       BUG_ON(RPC_IS_QUEUED(task));
 
  restarted:
        while (1) {
+               /*
+                * Garbage collection of pending timers...
+                */
+               rpc_delete_timer(task);
+
                /*
                 * Execute any pending callback.
                 */
@@ -657,7 +587,9 @@ __rpc_execute(struct rpc_task *task)
                         */
                        save_callback=task->tk_callback;
                        task->tk_callback=NULL;
+                       lock_kernel();
                        save_callback(task);
+                       unlock_kernel();
                }
 
                /*
@@ -665,43 +597,35 @@ __rpc_execute(struct rpc_task *task)
                 * tk_action may be NULL when the task has been killed
                 * by someone else.
                 */
-               if (RPC_IS_RUNNING(task)) {
-                       /*
-                        * Garbage collection of pending timers...
-                        */
-                       rpc_delete_timer(task);
+               if (!RPC_IS_QUEUED(task)) {
                        if (!task->tk_action)
                                break;
+                       lock_kernel();
                        task->tk_action(task);
-                       /* micro-optimization to avoid spinlock */
-                       if (RPC_IS_RUNNING(task))
-                               continue;
+                       unlock_kernel();
                }
 
                /*
-                * Check whether task is sleeping.
+                * Lockless check for whether task is sleeping or not.
                 */
-               spin_lock_bh(&rpc_queue_lock);
-               if (!RPC_IS_RUNNING(task)) {
-                       rpc_set_sleeping(task);
-                       if (RPC_IS_ASYNC(task)) {
-                               spin_unlock_bh(&rpc_queue_lock);
+               if (!RPC_IS_QUEUED(task))
+                       continue;
+               rpc_clear_running(task);
+               if (RPC_IS_ASYNC(task)) {
+                       /* Careful! we may have raced... */
+                       if (RPC_IS_QUEUED(task))
                                return 0;
-                       }
+                       if (rpc_test_and_set_running(task))
+                               return 0;
+                       continue;
                }
-               spin_unlock_bh(&rpc_queue_lock);
 
-               if (!RPC_IS_SLEEPING(task))
-                       continue;
                /* sync task: sleep here */
                dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid);
-               if (current->pid == rpciod_pid)
-                       printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
-
                if (RPC_TASK_UNINTERRUPTIBLE(task)) {
-                       __wait_event(task->tk_wait, !RPC_IS_SLEEPING(task));
+                       __wait_event(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task));
                } else {
-                       __wait_event_interruptible(task->tk_wait, !RPC_IS_SLEEPING(task), status);
+                       __wait_event_interruptible(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task), status);
                        /*
                         * When a sync task receives a signal, it exits with
                         * -ERESTARTSYS. In order to catch any callbacks that
@@ -715,11 +639,14 @@ __rpc_execute(struct rpc_task *task)
                                rpc_wake_up_task(task);
                        }
                }
+               rpc_set_running(task);
                dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
        }
 
        if (task->tk_exit) {
+               lock_kernel();
                task->tk_exit(task);
+               unlock_kernel();
                /* If tk_action is non-null, the user wants us to restart */
                if (task->tk_action) {
                        if (!RPC_ASSASSINATED(task)) {
@@ -738,7 +665,6 @@ __rpc_execute(struct rpc_task *task)
 
        /* Release all resources associated with the task */
        rpc_release_task(task);
-
        return status;
 }
 
@@ -754,57 +680,16 @@ __rpc_execute(struct rpc_task *task)
 int
 rpc_execute(struct rpc_task *task)
 {
-       int status = -EIO;
-       if (rpc_inhibit) {
-               printk(KERN_INFO "RPC: execution inhibited!\n");
-               goto out_release;
-       }
-
-       status = -EWOULDBLOCK;
-       if (task->tk_active) {
-               printk(KERN_ERR "RPC: active task was run twice!\n");
-               goto out_err;
-       }
+       BUG_ON(task->tk_active);
 
        task->tk_active = 1;
        rpc_set_running(task);
        return __rpc_execute(task);
- out_release:
-       rpc_release_task(task);
- out_err:
-       return status;
 }
 
-/*
- * This is our own little scheduler for async RPC tasks.
- */
-static void
-__rpc_schedule(void)
+static void rpc_async_schedule(void *arg)
 {
-       struct rpc_task *task;
-       int             count = 0;
-
-       dprintk("RPC:      rpc_schedule enter\n");
-       while (1) {
-
-               task_for_first(task, &schedq.tasks[0]) {
-                       __rpc_remove_wait_queue(task);
-                       spin_unlock_bh(&rpc_queue_lock);
-
-                       __rpc_execute(task);
-                       spin_lock_bh(&rpc_queue_lock);
-               } else {
-                       break;
-               }
-
-               if (++count >= 200 || need_resched()) {
-                       count = 0;
-                       spin_unlock_bh(&rpc_queue_lock);
-                       schedule();
-                       spin_lock_bh(&rpc_queue_lock);
-               }
-       }
-       dprintk("RPC:      rpc_schedule leave\n");
+       __rpc_execute((struct rpc_task *)arg);
 }
 
 /*
@@ -837,7 +722,7 @@ rpc_malloc(struct rpc_task *task, size_t size)
        return task->tk_buffer;
 }
 
-void
+static void
 rpc_free(struct rpc_task *task)
 {
        if (task->tk_buffer) {
@@ -862,7 +747,6 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call
        task->tk_client = clnt;
        task->tk_flags  = flags;
        task->tk_exit   = callback;
-       init_waitqueue_head(&task->tk_wait);
        if (current->uid != current->fsuid || current->gid != current->fsgid)
                task->tk_flags |= RPC_TASK_SETUID;
 
@@ -873,12 +757,11 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call
 
        task->tk_priority = RPC_PRIORITY_NORMAL;
        task->tk_cookie = (unsigned long)current;
-       INIT_LIST_HEAD(&task->tk_links);
 
-       /* Add to global list of all tasks */
-       spin_lock(&rpc_sched_lock);
-       list_add(&task->tk_task, &all_tasks);
-       spin_unlock(&rpc_sched_lock);
+       /* Initialize workqueue for async tasks */
+       task->tk_workqueue = rpciod_workqueue;
+       if (!RPC_IS_ASYNC(task))
+               init_waitqueue_head(&task->u.tk_wait.waitq);
 
        if (clnt) {
                atomic_inc(&clnt->cl_users);
@@ -889,9 +772,14 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call
        }
 
 #ifdef RPC_DEBUG
-       task->tk_magic = 0xf00baa;
+       task->tk_magic = RPC_TASK_MAGIC_ID;
        task->tk_pid = rpc_task_id++;
 #endif
+       /* Add to global list of all tasks */
+       spin_lock(&rpc_sched_lock);
+       list_add_tail(&task->tk_task, &all_tasks);
+       spin_unlock(&rpc_sched_lock);
+
        dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
                                current->pid);
 }
@@ -944,18 +832,12 @@ cleanup:
        goto out;
 }
 
-void
-rpc_release_task(struct rpc_task *task)
+void rpc_release_task(struct rpc_task *task)
 {
        dprintk("RPC: %4d release task\n", task->tk_pid);
 
 #ifdef RPC_DEBUG
-       if (task->tk_magic != 0xf00baa) {
-               printk(KERN_ERR "RPC: attempt to release a non-existing task!\n");
-               rpc_debug = ~0;
-               rpc_show_tasks();
-               return;
-       }
+       BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
 #endif
 
        /* Remove from global task list */
@@ -963,19 +845,9 @@ rpc_release_task(struct rpc_task *task)
        list_del(&task->tk_task);
        spin_unlock(&rpc_sched_lock);
 
-       /* Protect the execution below. */
-       spin_lock_bh(&rpc_queue_lock);
-
-       /* Disable timer to prevent zombie wakeup */
-       __rpc_disable_timer(task);
-
-       /* Remove from any wait queue we're still on */
-       __rpc_remove_wait_queue(task);
-
+       BUG_ON (RPC_IS_QUEUED(task));
        task->tk_active = 0;
 
-       spin_unlock_bh(&rpc_queue_lock);
-
        /* Synchronously delete any running timer */
        rpc_delete_timer(task);
 
@@ -1005,10 +877,9 @@ rpc_release_task(struct rpc_task *task)
  * queue 'childq'. If so returns a pointer to the parent.
  * Upon failure returns NULL.
  *
- * Caller must hold rpc_queue_lock
+ * Caller must hold childq.lock
  */
-static inline struct rpc_task *
-rpc_find_parent(struct rpc_task *child)
+static inline struct rpc_task *rpc_find_parent(struct rpc_task *child)
 {
        struct rpc_task *task, *parent;
        struct list_head *le;
@@ -1021,17 +892,16 @@ rpc_find_parent(struct rpc_task *child)
        return NULL;
 }
 
-static void
-rpc_child_exit(struct rpc_task *child)
+static void rpc_child_exit(struct rpc_task *child)
 {
        struct rpc_task *parent;
 
-       spin_lock_bh(&rpc_queue_lock);
+       spin_lock_bh(&childq.lock);
        if ((parent = rpc_find_parent(child)) != NULL) {
                parent->tk_status = child->tk_status;
                __rpc_wake_up_task(parent);
        }
-       spin_unlock_bh(&rpc_queue_lock);
+       spin_unlock_bh(&childq.lock);
 }
 
 /*
@@ -1054,22 +924,20 @@ fail:
        return NULL;
 }
 
-void
-rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
+void rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
 {
-       spin_lock_bh(&rpc_queue_lock);
+       spin_lock_bh(&childq.lock);
        /* N.B. Is it possible for the child to have already finished? */
        __rpc_sleep_on(&childq, task, func, NULL);
        rpc_schedule_run(child);
-       spin_unlock_bh(&rpc_queue_lock);
+       spin_unlock_bh(&childq.lock);
 }
 
 /*
  * Kill all tasks for the given client.
  * XXX: kill their descendants as well?
  */
-void
-rpc_killall_tasks(struct rpc_clnt *clnt)
+void rpc_killall_tasks(struct rpc_clnt *clnt)
 {
        struct rpc_task *rovr;
        struct list_head *le;
@@ -1080,104 +948,28 @@ rpc_killall_tasks(struct rpc_clnt *clnt)
         * Spin lock all_tasks to prevent changes...
         */
        spin_lock(&rpc_sched_lock);
-       alltask_for_each(rovr, le, &all_tasks)
+       alltask_for_each(rovr, le, &all_tasks) {
+               if (! RPC_IS_ACTIVATED(rovr))
+                       continue;
                if (!clnt || rovr->tk_client == clnt) {
                        rovr->tk_flags |= RPC_TASK_KILLED;
                        rpc_exit(rovr, -EIO);
                        rpc_wake_up_task(rovr);
                }
+       }
        spin_unlock(&rpc_sched_lock);
 }
 
 static DECLARE_MUTEX_LOCKED(rpciod_running);
 
-static inline int
-rpciod_task_pending(void)
-{
-       return !list_empty(&schedq.tasks[0]);
-}
-
-
-/*
- * This is the rpciod kernel thread
- */
-static int
-rpciod(void *ptr)
-{
-       int             rounds = 0;
-
-       lock_kernel();
-       /*
-        * Let our maker know we're running ...
-        */
-       rpciod_pid = current->pid;
-       up(&rpciod_running);
-
-       daemonize("rpciod");
-       allow_signal(SIGKILL);
-
-       dprintk("RPC: rpciod starting (pid %d)\n", rpciod_pid);
-       spin_lock_bh(&rpc_queue_lock);
-       while (rpciod_users) {
-               DEFINE_WAIT(wait);
-               if (signalled()) {
-                       spin_unlock_bh(&rpc_queue_lock);
-                       rpciod_killall();
-                       flush_signals(current);
-                       spin_lock_bh(&rpc_queue_lock);
-               }
-               __rpc_schedule();
-               if (current->flags & PF_FREEZE) {
-                       spin_unlock_bh(&rpc_queue_lock);
-                       refrigerator(PF_FREEZE);
-                       spin_lock_bh(&rpc_queue_lock);
-               }
-
-               if (++rounds >= 64) {   /* safeguard */
-                       spin_unlock_bh(&rpc_queue_lock);
-                       schedule();
-                       rounds = 0;
-                       spin_lock_bh(&rpc_queue_lock);
-               }
-
-               dprintk("RPC: rpciod back to sleep\n");
-               prepare_to_wait(&rpciod_idle, &wait, TASK_INTERRUPTIBLE);
-               if (!rpciod_task_pending() && !signalled()) {
-                       spin_unlock_bh(&rpc_queue_lock);
-                       schedule();
-                       rounds = 0;
-                       spin_lock_bh(&rpc_queue_lock);
-               }
-               finish_wait(&rpciod_idle, &wait);
-               dprintk("RPC: switch to rpciod\n");
-       }
-       spin_unlock_bh(&rpc_queue_lock);
-
-       dprintk("RPC: rpciod shutdown commences\n");
-       if (!list_empty(&all_tasks)) {
-               printk(KERN_ERR "rpciod: active tasks at shutdown?!\n");
-               rpciod_killall();
-       }
-
-       dprintk("RPC: rpciod exiting\n");
-       unlock_kernel();
-
-       rpciod_pid = 0;
-       complete_and_exit(&rpciod_killer, 0);
-       return 0;
-}
-
-static void
-rpciod_killall(void)
+static void rpciod_killall(void)
 {
        unsigned long flags;
 
        while (!list_empty(&all_tasks)) {
                clear_thread_flag(TIF_SIGPENDING);
                rpc_killall_tasks(NULL);
-               spin_lock_bh(&rpc_queue_lock);
-               __rpc_schedule();
-               spin_unlock_bh(&rpc_queue_lock);
+               flush_workqueue(rpciod_workqueue);
                if (!list_empty(&all_tasks)) {
                        dprintk("rpciod_killall: waiting for tasks to exit\n");
                        yield();
@@ -1195,28 +987,30 @@ rpciod_killall(void)
 int
 rpciod_up(void)
 {
+       struct workqueue_struct *wq;
        int error = 0;
 
        down(&rpciod_sema);
-       dprintk("rpciod_up: pid %d, users %d\n", rpciod_pid, rpciod_users);
+       dprintk("rpciod_up: users %d\n", rpciod_users);
        rpciod_users++;
-       if (rpciod_pid)
+       if (rpciod_workqueue)
                goto out;
        /*
         * If there's no pid, we should be the first user.
         */
        if (rpciod_users > 1)
-               printk(KERN_WARNING "rpciod_up: no pid, %d users??\n", rpciod_users);
+               printk(KERN_WARNING "rpciod_up: no workqueue, %d users??\n", rpciod_users);
        /*
         * Create the rpciod thread and wait for it to start.
         */
-       error = kernel_thread(rpciod, NULL, 0);
-       if (error < 0) {
-               printk(KERN_WARNING "rpciod_up: create thread failed, error=%d\n", error);
+       error = -ENOMEM;
+       wq = create_workqueue("rpciod");
+       if (wq == NULL) {
+               printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error);
                rpciod_users--;
                goto out;
        }
-       down(&rpciod_running);
+       rpciod_workqueue = wq;
        error = 0;
 out:
        up(&rpciod_sema);
@@ -1227,20 +1021,21 @@ void
 rpciod_down(void)
 {
        down(&rpciod_sema);
-       dprintk("rpciod_down pid %d sema %d\n", rpciod_pid, rpciod_users);
+       dprintk("rpciod_down sema %d\n", rpciod_users);
        if (rpciod_users) {
                if (--rpciod_users)
                        goto out;
        } else
-               printk(KERN_WARNING "rpciod_down: pid=%d, no users??\n", rpciod_pid);
+               printk(KERN_WARNING "rpciod_down: no users??\n");
 
-       if (!rpciod_pid) {
+       if (!rpciod_workqueue) {
                dprintk("rpciod_down: Nothing to do!\n");
                goto out;
        }
+       rpciod_killall();
 
-       kill_proc(rpciod_pid, SIGKILL, 1);
-       wait_for_completion(&rpciod_killer);
+       destroy_workqueue(rpciod_workqueue);
+       rpciod_workqueue = NULL;
  out:
        up(&rpciod_sema);
 }
@@ -1258,7 +1053,12 @@ void rpc_show_tasks(void)
        }
        printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
                "-rpcwait -action- --exit--\n");
-       alltask_for_each(t, le, &all_tasks)
+       alltask_for_each(t, le, &all_tasks) {
+               const char *rpc_waitq = "none";
+
+               if (RPC_IS_QUEUED(t))
+                       rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
+
                printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
                        t->tk_pid,
                        (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
@@ -1266,8 +1066,9 @@ void rpc_show_tasks(void)
                        t->tk_client,
                        (t->tk_client ? t->tk_client->cl_prog : 0),
                        t->tk_rqstp, t->tk_timeout,
-                       rpc_qname(t->tk_rpcwait),
+                       rpc_waitq,
                        t->tk_action, t->tk_exit);
+       }
        spin_unlock(&rpc_sched_lock);
 }
 #endif