fedora core 6 1.2949 + vserver 2.2.0
[linux-2.6.git] / net / sunrpc / sched.c
index ae45a76..fc083f0 100644 (file)
 #include <linux/smp.h>
 #include <linux/smp_lock.h>
 #include <linux/spinlock.h>
-#include <linux/suspend.h>
+#include <linux/mutex.h>
 
 #include <linux/sunrpc/clnt.h>
-#include <linux/sunrpc/xprt.h>
 
 #ifdef RPC_DEBUG
 #define RPCDBG_FACILITY                RPCDBG_SCHED
+#define RPC_TASK_MAGIC_ID      0xf00baa
 static int                     rpc_task_id;
 #endif
 
@@ -34,26 +34,15 @@ static int                  rpc_task_id;
 #define RPC_BUFFER_MAXSIZE     (2048)
 #define RPC_BUFFER_POOLSIZE    (8)
 #define RPC_TASK_POOLSIZE      (8)
-static kmem_cache_t    *rpc_task_slabp;
-static kmem_cache_t    *rpc_buffer_slabp;
-static mempool_t       *rpc_task_mempool;
-static mempool_t       *rpc_buffer_mempool;
+static struct kmem_cache       *rpc_task_slabp __read_mostly;
+static struct kmem_cache       *rpc_buffer_slabp __read_mostly;
+static mempool_t       *rpc_task_mempool __read_mostly;
+static mempool_t       *rpc_buffer_mempool __read_mostly;
 
 static void                    __rpc_default_timer(struct rpc_task *task);
 static void                    rpciod_killall(void);
-
-/*
- * When an asynchronous RPC task is activated within a bottom half
- * handler, or while executing another RPC task, it is put on
- * schedq, and rpciod is woken up.
- */
-static RPC_WAITQ(schedq, "schedq");
-
-/*
- * RPC tasks that create another task (e.g. for contacting the portmapper)
- * will wait on this queue for their child's completion
- */
-static RPC_WAITQ(childq, "childq");
+static void                    rpc_async_schedule(struct work_struct *);
+static void                     rpc_release_task(struct rpc_task *task);
 
 /*
  * RPC tasks sit here while waiting for conditions to improve.
@@ -68,26 +57,18 @@ static LIST_HEAD(all_tasks);
 /*
  * rpciod-related stuff
  */
-static DECLARE_WAIT_QUEUE_HEAD(rpciod_idle);
-static DECLARE_COMPLETION(rpciod_killer);
-static DECLARE_MUTEX(rpciod_sema);
+static DEFINE_MUTEX(rpciod_mutex);
 static unsigned int            rpciod_users;
-static pid_t                   rpciod_pid;
-static int                     rpc_inhibit;
+struct workqueue_struct *rpciod_workqueue;
 
-/*
- * Spinlock for wait queues. Access to the latter also has to be
- * interrupt-safe in order to allow timers to wake up sleeping tasks.
- */
-static spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
 /*
  * Spinlock for other critical sections of code.
  */
-static spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED;
+static DEFINE_SPINLOCK(rpc_sched_lock);
 
 /*
  * Disable the timer for a given RPC task. Should be called with
- * rpc_queue_lock and bh_disabled in order to avoid races within
+ * queue->lock and bh_disabled in order to avoid races within
  * rpc_run_timer().
  */
 static inline void
@@ -105,19 +86,19 @@ __rpc_disable_timer(struct rpc_task *task)
  * without calling del_timer_sync(). The latter could cause a
  * deadlock if called while we're holding spinlocks...
  */
-static void
-rpc_run_timer(struct rpc_task *task)
+static void rpc_run_timer(struct rpc_task *task)
 {
        void (*callback)(struct rpc_task *);
 
-       spin_lock_bh(&rpc_queue_lock);
        callback = task->tk_timeout_fn;
        task->tk_timeout_fn = NULL;
-       spin_unlock_bh(&rpc_queue_lock);
-       if (callback) {
+       if (callback && RPC_IS_QUEUED(task)) {
                dprintk("RPC: %4d running timer\n", task->tk_pid);
                callback(task);
        }
+       smp_mb__before_clear_bit();
+       clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
+       smp_mb__after_clear_bit();
 }
 
 /*
@@ -136,29 +117,23 @@ __rpc_add_timer(struct rpc_task *task, rpc_action timer)
                task->tk_timeout_fn = timer;
        else
                task->tk_timeout_fn = __rpc_default_timer;
+       set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
        mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
 }
 
-/*
- * Set up a timer for an already sleeping task.
- */
-void rpc_add_timer(struct rpc_task *task, rpc_action timer)
-{
-       spin_lock_bh(&rpc_queue_lock);
-       if (!RPC_IS_RUNNING(task))
-               __rpc_add_timer(task, timer);
-       spin_unlock_bh(&rpc_queue_lock);
-}
-
 /*
  * Delete any timer for the current task. Because we use del_timer_sync(),
- * this function should never be called while holding rpc_queue_lock.
+ * this function should never be called while holding queue->lock.
  */
-static inline void
+static void
 rpc_delete_timer(struct rpc_task *task)
 {
-       if (del_timer_sync(&task->tk_timer))
+       if (RPC_IS_QUEUED(task))
+               return;
+       if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) {
+               del_singleshot_timer_sync(&task->tk_timer);
                dprintk("RPC: %4d deleting timer\n", task->tk_pid);
+       }
 }
 
 /*
@@ -169,16 +144,17 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct r
        struct list_head *q;
        struct rpc_task *t;
 
+       INIT_LIST_HEAD(&task->u.tk_wait.links);
        q = &queue->tasks[task->tk_priority];
        if (unlikely(task->tk_priority > queue->maxpriority))
                q = &queue->tasks[queue->maxpriority];
-       list_for_each_entry(t, q, tk_list) {
+       list_for_each_entry(t, q, u.tk_wait.list) {
                if (t->tk_cookie == task->tk_cookie) {
-                       list_add_tail(&task->tk_list, &t->tk_links);
+                       list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
                        return;
                }
        }
-       list_add_tail(&task->tk_list, q);
+       list_add_tail(&task->u.tk_wait.list, q);
 }
 
 /*
@@ -189,37 +165,22 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct r
  * improve overall performance.
  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
  */
-static int __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
-       if (task->tk_rpcwait == queue)
-               return 0;
+       BUG_ON (RPC_IS_QUEUED(task));
 
-       if (task->tk_rpcwait) {
-               printk(KERN_WARNING "RPC: doubly enqueued task!\n");
-               return -EWOULDBLOCK;
-       }
        if (RPC_IS_PRIORITY(queue))
                __rpc_add_wait_queue_priority(queue, task);
        else if (RPC_IS_SWAPPER(task))
-               list_add(&task->tk_list, &queue->tasks[0]);
+               list_add(&task->u.tk_wait.list, &queue->tasks[0]);
        else
-               list_add_tail(&task->tk_list, &queue->tasks[0]);
-       task->tk_rpcwait = queue;
+               list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
+       task->u.tk_wait.rpc_waitq = queue;
+       queue->qlen++;
+       rpc_set_queued(task);
 
        dprintk("RPC: %4d added to queue %p \"%s\"\n",
                                task->tk_pid, queue, rpc_qname(queue));
-
-       return 0;
-}
-
-int rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
-{
-       int             result;
-
-       spin_lock_bh(&rpc_queue_lock);
-       result = __rpc_add_wait_queue(q, task);
-       spin_unlock_bh(&rpc_queue_lock);
-       return result;
 }
 
 /*
@@ -229,12 +190,12 @@ static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
 {
        struct rpc_task *t;
 
-       if (!list_empty(&task->tk_links)) {
-               t = list_entry(task->tk_links.next, struct rpc_task, tk_list);
-               list_move(&t->tk_list, &task->tk_list);
-               list_splice_init(&task->tk_links, &t->tk_links);
+       if (!list_empty(&task->u.tk_wait.links)) {
+               t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
+               list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
+               list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
        }
-       list_del(&task->tk_list);
+       list_del(&task->u.tk_wait.list);
 }
 
 /*
@@ -243,31 +204,18 @@ static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
  */
 static void __rpc_remove_wait_queue(struct rpc_task *task)
 {
-       struct rpc_wait_queue *queue = task->tk_rpcwait;
-
-       if (!queue)
-               return;
+       struct rpc_wait_queue *queue;
+       queue = task->u.tk_wait.rpc_waitq;
 
        if (RPC_IS_PRIORITY(queue))
                __rpc_remove_wait_queue_priority(task);
        else
-               list_del(&task->tk_list);
-       task->tk_rpcwait = NULL;
-
+               list_del(&task->u.tk_wait.list);
+       queue->qlen--;
        dprintk("RPC: %4d removed from queue %p \"%s\"\n",
                                task->tk_pid, queue, rpc_qname(queue));
 }
 
-void
-rpc_remove_wait_queue(struct rpc_task *task)
-{
-       if (!task->tk_rpcwait)
-               return;
-       spin_lock_bh(&rpc_queue_lock);
-       __rpc_remove_wait_queue(task);
-       spin_unlock_bh(&rpc_queue_lock);
-}
-
 static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
 {
        queue->priority = priority;
@@ -290,6 +238,7 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c
 {
        int i;
 
+       spin_lock_init(&queue->lock);
        for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
                INIT_LIST_HEAD(&queue->tasks[i]);
        queue->maxpriority = maxprio;
@@ -310,61 +259,80 @@ void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
 }
 EXPORT_SYMBOL(rpc_init_wait_queue);
 
+static int rpc_wait_bit_interruptible(void *word)
+{
+       if (signal_pending(current))
+               return -ERESTARTSYS;
+       schedule();
+       return 0;
+}
+
+static void rpc_set_active(struct rpc_task *task)
+{
+       if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0)
+               return;
+       spin_lock(&rpc_sched_lock);
+#ifdef RPC_DEBUG
+       task->tk_magic = RPC_TASK_MAGIC_ID;
+       task->tk_pid = rpc_task_id++;
+#endif
+       /* Add to global list of all tasks */
+       list_add_tail(&task->tk_task, &all_tasks);
+       spin_unlock(&rpc_sched_lock);
+}
+
 /*
- * Make an RPC task runnable.
- *
- * Note: If the task is ASYNC, this must be called with 
- * the spinlock held to protect the wait queue operation.
+ * Mark an RPC call as having completed by clearing the 'active' bit
  */
-static inline void
-rpc_make_runnable(struct rpc_task *task)
+static void rpc_mark_complete_task(struct rpc_task *task)
 {
-       if (task->tk_timeout_fn) {
-               printk(KERN_ERR "RPC: task w/ running timer in rpc_make_runnable!!\n");
-               return;
-       }
-       rpc_set_running(task);
-       if (RPC_IS_ASYNC(task)) {
-               if (RPC_IS_SLEEPING(task)) {
-                       int status;
-                       status = __rpc_add_wait_queue(&schedq, task);
-                       if (status < 0) {
-                               printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
-                               task->tk_status = status;
-                               return;
-                       }
-                       rpc_clear_sleeping(task);
-                       wake_up(&rpciod_idle);
-               }
-       } else {
-               rpc_clear_sleeping(task);
-               wake_up(&task->tk_wait);
-       }
+       smp_mb__before_clear_bit();
+       clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
+       smp_mb__after_clear_bit();
+       wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
 }
 
 /*
- * Place a newly initialized task on the schedq.
+ * Allow callers to wait for completion of an RPC call
  */
-static inline void
-rpc_schedule_run(struct rpc_task *task)
+int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
 {
-       /* Don't run a child twice! */
-       if (RPC_IS_ACTIVATED(task))
-               return;
-       task->tk_active = 1;
-       rpc_set_sleeping(task);
-       rpc_make_runnable(task);
+       if (action == NULL)
+               action = rpc_wait_bit_interruptible;
+       return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
+                       action, TASK_INTERRUPTIBLE);
 }
+EXPORT_SYMBOL(__rpc_wait_for_completion_task);
 
 /*
- *     For other people who may need to wake the I/O daemon
- *     but should (for now) know nothing about its innards
+ * Make an RPC task runnable.
+ *
+ * Note: If the task is ASYNC, this must be called with 
+ * the spinlock held to protect the wait queue operation.
  */
-void rpciod_wake_up(void)
+static void rpc_make_runnable(struct rpc_task *task)
 {
-       if(rpciod_pid==0)
-               printk(KERN_ERR "rpciod: wot no daemon?\n");
-       wake_up(&rpciod_idle);
+       BUG_ON(task->tk_timeout_fn);
+       rpc_clear_queued(task);
+       if (rpc_test_and_set_running(task))
+               return;
+       /* We might have raced */
+       if (RPC_IS_QUEUED(task)) {
+               rpc_clear_running(task);
+               return;
+       }
+       if (RPC_IS_ASYNC(task)) {
+               int status;
+
+               INIT_WORK(&task->u.tk_work, rpc_async_schedule);
+               status = queue_work(task->tk_workqueue, &task->u.tk_work);
+               if (status < 0) {
+                       printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
+                       task->tk_status = status;
+                       return;
+               }
+       } else
+               wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
 }
 
 /*
@@ -373,12 +341,9 @@ void rpciod_wake_up(void)
  * NB: An RPC task will only receive interrupt-driven events as long
  * as it's on a wait queue.
  */
-static void
-__rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
+static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
                        rpc_action action, rpc_action timer)
 {
-       int status;
-
        dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
                                rpc_qname(q), jiffies);
 
@@ -387,76 +352,66 @@ __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
                return;
        }
 
-       /* Mark the task as being activated if so needed */
-       if (!RPC_IS_ACTIVATED(task)) {
-               task->tk_active = 1;
-               rpc_set_sleeping(task);
-       }
+       __rpc_add_wait_queue(q, task);
 
-       status = __rpc_add_wait_queue(q, task);
-       if (status) {
-               printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
-               task->tk_status = status;
-       } else {
-               rpc_clear_running(task);
-               if (task->tk_callback) {
-                       dprintk(KERN_ERR "RPC: %4d overwrites an active callback\n", task->tk_pid);
-                       BUG();
-               }
-               task->tk_callback = action;
-               __rpc_add_timer(task, timer);
-       }
+       BUG_ON(task->tk_callback != NULL);
+       task->tk_callback = action;
+       __rpc_add_timer(task, timer);
 }
 
-void
-rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
+void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
                                rpc_action action, rpc_action timer)
 {
+       /* Mark the task as being activated if so needed */
+       rpc_set_active(task);
+
        /*
         * Protect the queue operations.
         */
-       spin_lock_bh(&rpc_queue_lock);
+       spin_lock_bh(&q->lock);
        __rpc_sleep_on(q, task, action, timer);
-       spin_unlock_bh(&rpc_queue_lock);
+       spin_unlock_bh(&q->lock);
 }
 
 /**
- * __rpc_wake_up_task - wake up a single rpc_task
+ * __rpc_do_wake_up_task - wake up a single rpc_task
  * @task: task to be woken up
  *
- * Caller must hold rpc_queue_lock
+ * Caller must hold queue->lock, and have cleared the task queued flag.
  */
-static void
-__rpc_wake_up_task(struct rpc_task *task)
+static void __rpc_do_wake_up_task(struct rpc_task *task)
 {
-       dprintk("RPC: %4d __rpc_wake_up_task (now %ld inh %d)\n",
-                                       task->tk_pid, jiffies, rpc_inhibit);
+       dprintk("RPC: %4d __rpc_wake_up_task (now %ld)\n", task->tk_pid, jiffies);
 
 #ifdef RPC_DEBUG
-       if (task->tk_magic != 0xf00baa) {
-               printk(KERN_ERR "RPC: attempt to wake up non-existing task!\n");
-               rpc_debug = ~0;
-               rpc_show_tasks();
-               return;
-       }
+       BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
 #endif
        /* Has the task been executed yet? If not, we cannot wake it up! */
        if (!RPC_IS_ACTIVATED(task)) {
                printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
                return;
        }
-       if (RPC_IS_RUNNING(task))
-               return;
 
        __rpc_disable_timer(task);
-       if (task->tk_rpcwait != &schedq)
-               __rpc_remove_wait_queue(task);
+       __rpc_remove_wait_queue(task);
 
        rpc_make_runnable(task);
 
        dprintk("RPC:      __rpc_wake_up_task done\n");
 }
 
+/*
+ * Wake up the specified task
+ */
+static void __rpc_wake_up_task(struct rpc_task *task)
+{
+       if (rpc_start_wakeup(task)) {
+               if (RPC_IS_QUEUED(task))
+                       __rpc_do_wake_up_task(task);
+               rpc_finish_wakeup(task);
+       }
+}
+
 /*
  * Default timeout handler if none specified by user
  */
@@ -471,14 +426,21 @@ __rpc_default_timer(struct rpc_task *task)
 /*
  * Wake up the specified task
  */
-void
-rpc_wake_up_task(struct rpc_task *task)
+void rpc_wake_up_task(struct rpc_task *task)
 {
-       if (RPC_IS_RUNNING(task))
-               return;
-       spin_lock_bh(&rpc_queue_lock);
-       __rpc_wake_up_task(task);
-       spin_unlock_bh(&rpc_queue_lock);
+       rcu_read_lock_bh();
+       if (rpc_start_wakeup(task)) {
+               if (RPC_IS_QUEUED(task)) {
+                       struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq;
+
+                       /* Note: we're already in a bh-safe context */
+                       spin_lock(&queue->lock);
+                       __rpc_do_wake_up_task(task);
+                       spin_unlock(&queue->lock);
+               }
+               rpc_finish_wakeup(task);
+       }
+       rcu_read_unlock_bh();
 }
 
 /*
@@ -494,11 +456,11 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu
         */
        q = &queue->tasks[queue->priority];
        if (!list_empty(q)) {
-               task = list_entry(q->next, struct rpc_task, tk_list);
+               task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
                if (queue->cookie == task->tk_cookie) {
                        if (--queue->nr)
                                goto out;
-                       list_move_tail(&task->tk_list, q);
+                       list_move_tail(&task->u.tk_wait.list, q);
                }
                /*
                 * Check if we need to switch queues.
@@ -516,7 +478,7 @@ static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queu
                else
                        q = q - 1;
                if (!list_empty(q)) {
-                       task = list_entry(q->next, struct rpc_task, tk_list);
+                       task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
                        goto new_queue;
                }
        } while (q != &queue->tasks[queue->priority]);
@@ -541,14 +503,16 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
        struct rpc_task *task = NULL;
 
        dprintk("RPC:      wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
-       spin_lock_bh(&rpc_queue_lock);
+       rcu_read_lock_bh();
+       spin_lock(&queue->lock);
        if (RPC_IS_PRIORITY(queue))
                task = __rpc_wake_up_next_priority(queue);
        else {
                task_for_first(task, &queue->tasks[0])
                        __rpc_wake_up_task(task);
        }
-       spin_unlock_bh(&rpc_queue_lock);
+       spin_unlock(&queue->lock);
+       rcu_read_unlock_bh();
 
        return task;
 }
@@ -557,25 +521,25 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
  * rpc_wake_up - wake up all rpc_tasks
  * @queue: rpc_wait_queue on which the tasks are sleeping
  *
- * Grabs rpc_queue_lock
+ * Grabs queue->lock
  */
 void rpc_wake_up(struct rpc_wait_queue *queue)
 {
-       struct rpc_task *task;
-
+       struct rpc_task *task, *next;
        struct list_head *head;
-       spin_lock_bh(&rpc_queue_lock);
+
+       rcu_read_lock_bh();
+       spin_lock(&queue->lock);
        head = &queue->tasks[queue->maxpriority];
        for (;;) {
-               while (!list_empty(head)) {
-                       task = list_entry(head->next, struct rpc_task, tk_list);
+               list_for_each_entry_safe(task, next, head, u.tk_wait.list)
                        __rpc_wake_up_task(task);
-               }
                if (head == &queue->tasks[0])
                        break;
                head--;
        }
-       spin_unlock_bh(&rpc_queue_lock);
+       spin_unlock(&queue->lock);
+       rcu_read_unlock_bh();
 }
 
 /**
@@ -583,18 +547,18 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
  * @queue: rpc_wait_queue on which the tasks are sleeping
  * @status: status value to set
  *
- * Grabs rpc_queue_lock
+ * Grabs queue->lock
  */
 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
 {
+       struct rpc_task *task, *next;
        struct list_head *head;
-       struct rpc_task *task;
 
-       spin_lock_bh(&rpc_queue_lock);
+       rcu_read_lock_bh();
+       spin_lock(&queue->lock);
        head = &queue->tasks[queue->maxpriority];
        for (;;) {
-               while (!list_empty(head)) {
-                       task = list_entry(head->next, struct rpc_task, tk_list);
+               list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
                        task->tk_status = status;
                        __rpc_wake_up_task(task);
                }
@@ -602,45 +566,80 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
                        break;
                head--;
        }
-       spin_unlock_bh(&rpc_queue_lock);
+       spin_unlock(&queue->lock);
+       rcu_read_unlock_bh();
+}
+
+static void __rpc_atrun(struct rpc_task *task)
+{
+       rpc_wake_up_task(task);
 }
 
 /*
  * Run a task at a later time
  */
-static void    __rpc_atrun(struct rpc_task *);
-void
-rpc_delay(struct rpc_task *task, unsigned long delay)
+void rpc_delay(struct rpc_task *task, unsigned long delay)
 {
        task->tk_timeout = delay;
        rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
 }
 
-static void
-__rpc_atrun(struct rpc_task *task)
+/*
+ * Helper to call task->tk_ops->rpc_call_prepare
+ */
+static void rpc_prepare_task(struct rpc_task *task)
 {
-       task->tk_status = 0;
-       rpc_wake_up_task(task);
+       lock_kernel();
+       task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
+       unlock_kernel();
+}
+
+/*
+ * Helper that calls task->tk_ops->rpc_call_done if it exists
+ */
+void rpc_exit_task(struct rpc_task *task)
+{
+       task->tk_action = NULL;
+       if (task->tk_ops->rpc_call_done != NULL) {
+               lock_kernel();
+               task->tk_ops->rpc_call_done(task, task->tk_calldata);
+               unlock_kernel();
+               if (task->tk_action != NULL) {
+                       WARN_ON(RPC_ASSASSINATED(task));
+                       /* Always release the RPC slot and buffer memory */
+                       xprt_release(task);
+               }
+       }
+}
+EXPORT_SYMBOL(rpc_exit_task);
+
+void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
+{
+       if (ops->rpc_release != NULL) {
+               lock_kernel();
+               ops->rpc_release(calldata);
+               unlock_kernel();
+       }
 }
 
 /*
  * This is the RPC `scheduler' (or rather, the finite state machine).
  */
-static int
-__rpc_execute(struct rpc_task *task)
+static int __rpc_execute(struct rpc_task *task)
 {
        int             status = 0;
 
        dprintk("RPC: %4d rpc_execute flgs %x\n",
                                task->tk_pid, task->tk_flags);
 
-       if (!RPC_IS_RUNNING(task)) {
-               printk(KERN_WARNING "RPC: rpc_execute called for sleeping task!!\n");
-               return 0;
-       }
+       BUG_ON(RPC_IS_QUEUED(task));
+
+       for (;;) {
+               /*
+                * Garbage collection of pending timers...
+                */
+               rpc_delete_timer(task);
 
- restarted:
-       while (1) {
                /*
                 * Execute any pending callback.
                 */
@@ -665,80 +664,52 @@ __rpc_execute(struct rpc_task *task)
                 * tk_action may be NULL when the task has been killed
                 * by someone else.
                 */
-               if (RPC_IS_RUNNING(task)) {
-                       /*
-                        * Garbage collection of pending timers...
-                        */
-                       rpc_delete_timer(task);
-                       if (!task->tk_action)
+               if (!RPC_IS_QUEUED(task)) {
+                       if (task->tk_action == NULL)
                                break;
                        task->tk_action(task);
-                       /* micro-optimization to avoid spinlock */
-                       if (RPC_IS_RUNNING(task))
-                               continue;
                }
 
                /*
-                * Check whether task is sleeping.
+                * Lockless check for whether task is sleeping or not.
                 */
-               spin_lock_bh(&rpc_queue_lock);
-               if (!RPC_IS_RUNNING(task)) {
-                       rpc_set_sleeping(task);
-                       if (RPC_IS_ASYNC(task)) {
-                               spin_unlock_bh(&rpc_queue_lock);
+               if (!RPC_IS_QUEUED(task))
+                       continue;
+               rpc_clear_running(task);
+               if (RPC_IS_ASYNC(task)) {
+                       /* Careful! we may have raced... */
+                       if (RPC_IS_QUEUED(task))
+                               return 0;
+                       if (rpc_test_and_set_running(task))
                                return 0;
-                       }
+                       continue;
                }
-               spin_unlock_bh(&rpc_queue_lock);
 
-               if (!RPC_IS_SLEEPING(task))
-                       continue;
                /* sync task: sleep here */
                dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid);
-               if (current->pid == rpciod_pid)
-                       printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
-
-               if (!task->tk_client->cl_intr) {
-                       __wait_event(task->tk_wait, !RPC_IS_SLEEPING(task));
-               } else {
-                       __wait_event_interruptible(task->tk_wait, !RPC_IS_SLEEPING(task), status);
+               /* Note: Caller should be using rpc_clnt_sigmask() */
+               status = out_of_line_wait_on_bit(&task->tk_runstate,
+                               RPC_TASK_QUEUED, rpc_wait_bit_interruptible,
+                               TASK_INTERRUPTIBLE);
+               if (status == -ERESTARTSYS) {
                        /*
                         * When a sync task receives a signal, it exits with
                         * -ERESTARTSYS. In order to catch any callbacks that
                         * clean up after sleeping on some queue, we don't
                         * break the loop here, but go around once more.
                         */
-                       if (status == -ERESTARTSYS) {
-                               dprintk("RPC: %4d got signal\n", task->tk_pid);
-                               task->tk_flags |= RPC_TASK_KILLED;
-                               rpc_exit(task, -ERESTARTSYS);
-                               rpc_wake_up_task(task);
-                       }
+                       dprintk("RPC: %4d got signal\n", task->tk_pid);
+                       task->tk_flags |= RPC_TASK_KILLED;
+                       rpc_exit(task, -ERESTARTSYS);
+                       rpc_wake_up_task(task);
                }
+               rpc_set_running(task);
                dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
        }
 
-       if (task->tk_exit) {
-               task->tk_exit(task);
-               /* If tk_action is non-null, the user wants us to restart */
-               if (task->tk_action) {
-                       if (!RPC_ASSASSINATED(task)) {
-                               /* Release RPC slot and buffer memory */
-                               if (task->tk_rqstp)
-                                       xprt_release(task);
-                               rpc_free(task);
-                               goto restarted;
-                       }
-                       printk(KERN_ERR "RPC: dead task tries to walk away.\n");
-               }
-       }
-
-       dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
-       status = task->tk_status;
-
+       dprintk("RPC: %4d, return %d, status %d\n", task->tk_pid, status, task->tk_status);
        /* Release all resources associated with the task */
        rpc_release_task(task);
-
        return status;
 }
 
@@ -754,71 +725,30 @@ __rpc_execute(struct rpc_task *task)
 int
 rpc_execute(struct rpc_task *task)
 {
-       int status = -EIO;
-       if (rpc_inhibit) {
-               printk(KERN_INFO "RPC: execution inhibited!\n");
-               goto out_release;
-       }
-
-       status = -EWOULDBLOCK;
-       if (task->tk_active) {
-               printk(KERN_ERR "RPC: active task was run twice!\n");
-               goto out_err;
-       }
-
-       task->tk_active = 1;
+       rpc_set_active(task);
        rpc_set_running(task);
        return __rpc_execute(task);
- out_release:
-       rpc_release_task(task);
- out_err:
-       return status;
 }
 
-/*
- * This is our own little scheduler for async RPC tasks.
- */
-static void
-__rpc_schedule(void)
+static void rpc_async_schedule(struct work_struct *work)
 {
-       struct rpc_task *task;
-       int             count = 0;
-
-       dprintk("RPC:      rpc_schedule enter\n");
-       while (1) {
-
-               task_for_first(task, &schedq.tasks[0]) {
-                       __rpc_remove_wait_queue(task);
-                       spin_unlock_bh(&rpc_queue_lock);
-
-                       __rpc_execute(task);
-                       spin_lock_bh(&rpc_queue_lock);
-               } else {
-                       break;
-               }
-
-               if (++count >= 200 || need_resched()) {
-                       count = 0;
-                       spin_unlock_bh(&rpc_queue_lock);
-                       schedule();
-                       spin_lock_bh(&rpc_queue_lock);
-               }
-       }
-       dprintk("RPC:      rpc_schedule leave\n");
+       __rpc_execute(container_of(work, struct rpc_task, u.tk_work));
 }
 
-/*
- * Allocate memory for RPC purposes.
+/**
+ * rpc_malloc - allocate an RPC buffer
+ * @task: RPC task that will use this buffer
+ * @size: requested byte size
  *
  * We try to ensure that some NFS reads and writes can always proceed
  * by using a mempool when allocating 'small' buffers.
  * In order to avoid memory starvation triggering more writebacks of
  * NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
  */
-void *
-rpc_malloc(struct rpc_task *task, size_t size)
+void * rpc_malloc(struct rpc_task *task, size_t size)
 {
-       int     gfp;
+       struct rpc_rqst *req = task->tk_rqstp;
+       gfp_t   gfp;
 
        if (task->tk_flags & RPC_TASK_SWAPPER)
                gfp = GFP_ATOMIC;
@@ -826,70 +756,76 @@ rpc_malloc(struct rpc_task *task, size_t size)
                gfp = GFP_NOFS;
 
        if (size > RPC_BUFFER_MAXSIZE) {
-               task->tk_buffer =  kmalloc(size, gfp);
-               if (task->tk_buffer)
-                       task->tk_bufsize = size;
+               req->rq_buffer = kmalloc(size, gfp);
+               if (req->rq_buffer)
+                       req->rq_bufsize = size;
        } else {
-               task->tk_buffer =  mempool_alloc(rpc_buffer_mempool, gfp);
-               if (task->tk_buffer)
-                       task->tk_bufsize = RPC_BUFFER_MAXSIZE;
+               req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
+               if (req->rq_buffer)
+                       req->rq_bufsize = RPC_BUFFER_MAXSIZE;
        }
-       return task->tk_buffer;
+       return req->rq_buffer;
 }
 
-void
-rpc_free(struct rpc_task *task)
+/**
+ * rpc_free - free buffer allocated via rpc_malloc
+ * @task: RPC task with a buffer to be freed
+ *
+ */
+void rpc_free(struct rpc_task *task)
 {
-       if (task->tk_buffer) {
-               if (task->tk_bufsize == RPC_BUFFER_MAXSIZE)
-                       mempool_free(task->tk_buffer, rpc_buffer_mempool);
+       struct rpc_rqst *req = task->tk_rqstp;
+
+       if (req->rq_buffer) {
+               if (req->rq_bufsize == RPC_BUFFER_MAXSIZE)
+                       mempool_free(req->rq_buffer, rpc_buffer_mempool);
                else
-                       kfree(task->tk_buffer);
-               task->tk_buffer = NULL;
-               task->tk_bufsize = 0;
+                       kfree(req->rq_buffer);
+               req->rq_buffer = NULL;
+               req->rq_bufsize = 0;
        }
 }
 
 /*
  * Creation and deletion of RPC task structures
  */
-void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action callback, int flags)
+void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
 {
        memset(task, 0, sizeof(*task));
        init_timer(&task->tk_timer);
        task->tk_timer.data     = (unsigned long) task;
        task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
+       atomic_set(&task->tk_count, 1);
        task->tk_client = clnt;
        task->tk_flags  = flags;
-       task->tk_exit   = callback;
-       init_waitqueue_head(&task->tk_wait);
-       if (current->uid != current->fsuid || current->gid != current->fsgid)
-               task->tk_flags |= RPC_TASK_SETUID;
+       task->tk_ops = tk_ops;
+       if (tk_ops->rpc_call_prepare != NULL)
+               task->tk_action = rpc_prepare_task;
+       task->tk_calldata = calldata;
 
        /* Initialize retry counters */
        task->tk_garb_retry = 2;
        task->tk_cred_retry = 2;
-       task->tk_suid_retry = 1;
 
        task->tk_priority = RPC_PRIORITY_NORMAL;
        task->tk_cookie = (unsigned long)current;
-       INIT_LIST_HEAD(&task->tk_links);
 
-       /* Add to global list of all tasks */
-       spin_lock(&rpc_sched_lock);
-       list_add(&task->tk_task, &all_tasks);
-       spin_unlock(&rpc_sched_lock);
+       /* Initialize workqueue for async tasks */
+       task->tk_workqueue = rpciod_workqueue;
 
        if (clnt) {
                atomic_inc(&clnt->cl_users);
                if (clnt->cl_softrtry)
                        task->tk_flags |= RPC_TASK_SOFT;
+               if (!clnt->cl_intr)
+                       task->tk_flags |= RPC_TASK_NOINTR;
        }
 
-#ifdef RPC_DEBUG
-       task->tk_magic = 0xf00baa;
-       task->tk_pid = rpc_task_id++;
-#endif
+       BUG_ON(task->tk_ops == NULL);
+
+       /* starting timestamp */
+       task->tk_start = jiffies;
+
        dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
                                current->pid);
 }
@@ -900,9 +836,9 @@ rpc_alloc_task(void)
        return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
 }
 
-static void
-rpc_default_free_task(struct rpc_task *task)
+static void rpc_free_task(struct rcu_head *rcu)
 {
+       struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu);
        dprintk("RPC: %4d freeing task\n", task->tk_pid);
        mempool_free(task, rpc_task_mempool);
 }
@@ -912,8 +848,7 @@ rpc_default_free_task(struct rpc_task *task)
  * clean up after an allocation failure, as the client may
  * have specified "oneshot".
  */
-struct rpc_task *
-rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
+struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
 {
        struct rpc_task *task;
 
@@ -921,10 +856,7 @@ rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
        if (!task)
                goto cleanup;
 
-       rpc_init_task(task, clnt, callback, flags);
-
-       /* Replace tk_release */
-       task->tk_release = rpc_default_free_task;
+       rpc_init_task(task, clnt, flags, tk_ops, calldata);
 
        dprintk("RPC: %4d allocated task\n", task->tk_pid);
        task->tk_flags |= RPC_TASK_DYNAMIC;
@@ -942,132 +874,83 @@ cleanup:
        goto out;
 }
 
-void
-rpc_release_task(struct rpc_task *task)
+
+void rpc_put_task(struct rpc_task *task)
 {
-       dprintk("RPC: %4d release task\n", task->tk_pid);
+       const struct rpc_call_ops *tk_ops = task->tk_ops;
+       void *calldata = task->tk_calldata;
 
-#ifdef RPC_DEBUG
-       if (task->tk_magic != 0xf00baa) {
-               printk(KERN_ERR "RPC: attempt to release a non-existing task!\n");
-               rpc_debug = ~0;
-               rpc_show_tasks();
+       if (!atomic_dec_and_test(&task->tk_count))
                return;
-       }
-#endif
-
-       /* Remove from global task list */
-       spin_lock(&rpc_sched_lock);
-       list_del(&task->tk_task);
-       spin_unlock(&rpc_sched_lock);
-
-       /* Protect the execution below. */
-       spin_lock_bh(&rpc_queue_lock);
-
-       /* Disable timer to prevent zombie wakeup */
-       __rpc_disable_timer(task);
-
-       /* Remove from any wait queue we're still on */
-       __rpc_remove_wait_queue(task);
-
-       task->tk_active = 0;
-
-       spin_unlock_bh(&rpc_queue_lock);
-
-       /* Synchronously delete any running timer */
-       rpc_delete_timer(task);
-
        /* Release resources */
        if (task->tk_rqstp)
                xprt_release(task);
        if (task->tk_msg.rpc_cred)
                rpcauth_unbindcred(task);
-       rpc_free(task);
        if (task->tk_client) {
                rpc_release_client(task->tk_client);
                task->tk_client = NULL;
        }
+       if (task->tk_flags & RPC_TASK_DYNAMIC)
+               call_rcu_bh(&task->u.tk_rcu, rpc_free_task);
+       rpc_release_calldata(tk_ops, calldata);
+}
+EXPORT_SYMBOL(rpc_put_task);
 
+static void rpc_release_task(struct rpc_task *task)
+{
 #ifdef RPC_DEBUG
-       task->tk_magic = 0;
+       BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
 #endif
-       if (task->tk_release)
-               task->tk_release(task);
-}
+       dprintk("RPC: %4d release task\n", task->tk_pid);
 
-/**
- * rpc_find_parent - find the parent of a child task.
- * @child: child task
- *
- * Checks that the parent task is still sleeping on the
- * queue 'childq'. If so returns a pointer to the parent.
- * Upon failure returns NULL.
- *
- * Caller must hold rpc_queue_lock
- */
-static inline struct rpc_task *
-rpc_find_parent(struct rpc_task *child)
-{
-       struct rpc_task *task, *parent;
-       struct list_head *le;
+       /* Remove from global task list */
+       spin_lock(&rpc_sched_lock);
+       list_del(&task->tk_task);
+       spin_unlock(&rpc_sched_lock);
 
-       parent = (struct rpc_task *) child->tk_calldata;
-       task_for_each(task, le, &childq.tasks[0])
-               if (task == parent)
-                       return parent;
+       BUG_ON (RPC_IS_QUEUED(task));
 
-       return NULL;
-}
+       /* Synchronously delete any running timer */
+       rpc_delete_timer(task);
 
-static void
-rpc_child_exit(struct rpc_task *child)
-{
-       struct rpc_task *parent;
+#ifdef RPC_DEBUG
+       task->tk_magic = 0;
+#endif
+       /* Wake up anyone who is waiting for task completion */
+       rpc_mark_complete_task(task);
 
-       spin_lock_bh(&rpc_queue_lock);
-       if ((parent = rpc_find_parent(child)) != NULL) {
-               parent->tk_status = child->tk_status;
-               __rpc_wake_up_task(parent);
-       }
-       spin_unlock_bh(&rpc_queue_lock);
+       rpc_put_task(task);
 }
 
-/*
- * Note: rpc_new_task releases the client after a failure.
+/**
+ * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
+ * @clnt: pointer to RPC client
+ * @flags: RPC flags
+ * @ops: RPC call ops
+ * @data: user call data
  */
-struct rpc_task *
-rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent)
+struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
+                                       const struct rpc_call_ops *ops,
+                                       void *data)
 {
-       struct rpc_task *task;
-
-       task = rpc_new_task(clnt, NULL, RPC_TASK_ASYNC | RPC_TASK_CHILD);
-       if (!task)
-               goto fail;
-       task->tk_exit = rpc_child_exit;
-       task->tk_calldata = parent;
+       struct rpc_task *task;
+       task = rpc_new_task(clnt, flags, ops, data);
+       if (task == NULL) {
+               rpc_release_calldata(ops, data);
+               return ERR_PTR(-ENOMEM);
+       }
+       atomic_inc(&task->tk_count);
+       rpc_execute(task);
        return task;
-
-fail:
-       parent->tk_status = -ENOMEM;
-       return NULL;
-}
-
-void
-rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
-{
-       spin_lock_bh(&rpc_queue_lock);
-       /* N.B. Is it possible for the child to have already finished? */
-       __rpc_sleep_on(&childq, task, func, NULL);
-       rpc_schedule_run(child);
-       spin_unlock_bh(&rpc_queue_lock);
 }
+EXPORT_SYMBOL(rpc_run_task);
 
 /*
  * Kill all tasks for the given client.
  * XXX: kill their descendants as well?
  */
-void
-rpc_killall_tasks(struct rpc_clnt *clnt)
+void rpc_killall_tasks(struct rpc_clnt *clnt)
 {
        struct rpc_task *rovr;
        struct list_head *le;
@@ -1078,104 +961,28 @@ rpc_killall_tasks(struct rpc_clnt *clnt)
         * Spin lock all_tasks to prevent changes...
         */
        spin_lock(&rpc_sched_lock);
-       alltask_for_each(rovr, le, &all_tasks)
+       alltask_for_each(rovr, le, &all_tasks) {
+               if (! RPC_IS_ACTIVATED(rovr))
+                       continue;
                if (!clnt || rovr->tk_client == clnt) {
                        rovr->tk_flags |= RPC_TASK_KILLED;
                        rpc_exit(rovr, -EIO);
                        rpc_wake_up_task(rovr);
                }
+       }
        spin_unlock(&rpc_sched_lock);
 }
 
 static DECLARE_MUTEX_LOCKED(rpciod_running);
 
-static inline int
-rpciod_task_pending(void)
-{
-       return !list_empty(&schedq.tasks[0]);
-}
-
-
-/*
- * This is the rpciod kernel thread
- */
-static int
-rpciod(void *ptr)
-{
-       int             rounds = 0;
-
-       lock_kernel();
-       /*
-        * Let our maker know we're running ...
-        */
-       rpciod_pid = current->pid;
-       up(&rpciod_running);
-
-       daemonize("rpciod");
-       allow_signal(SIGKILL);
-
-       dprintk("RPC: rpciod starting (pid %d)\n", rpciod_pid);
-       spin_lock_bh(&rpc_queue_lock);
-       while (rpciod_users) {
-               DEFINE_WAIT(wait);
-               if (signalled()) {
-                       spin_unlock_bh(&rpc_queue_lock);
-                       rpciod_killall();
-                       flush_signals(current);
-                       spin_lock_bh(&rpc_queue_lock);
-               }
-               __rpc_schedule();
-               if (current->flags & PF_FREEZE) {
-                       spin_unlock_bh(&rpc_queue_lock);
-                       refrigerator(PF_FREEZE);
-                       spin_lock_bh(&rpc_queue_lock);
-               }
-
-               if (++rounds >= 64) {   /* safeguard */
-                       spin_unlock_bh(&rpc_queue_lock);
-                       schedule();
-                       rounds = 0;
-                       spin_lock_bh(&rpc_queue_lock);
-               }
-
-               dprintk("RPC: rpciod back to sleep\n");
-               prepare_to_wait(&rpciod_idle, &wait, TASK_INTERRUPTIBLE);
-               if (!rpciod_task_pending() && !signalled()) {
-                       spin_unlock_bh(&rpc_queue_lock);
-                       schedule();
-                       rounds = 0;
-                       spin_lock_bh(&rpc_queue_lock);
-               }
-               finish_wait(&rpciod_idle, &wait);
-               dprintk("RPC: switch to rpciod\n");
-       }
-       spin_unlock_bh(&rpc_queue_lock);
-
-       dprintk("RPC: rpciod shutdown commences\n");
-       if (!list_empty(&all_tasks)) {
-               printk(KERN_ERR "rpciod: active tasks at shutdown?!\n");
-               rpciod_killall();
-       }
-
-       dprintk("RPC: rpciod exiting\n");
-       unlock_kernel();
-
-       rpciod_pid = 0;
-       complete_and_exit(&rpciod_killer, 0);
-       return 0;
-}
-
-static void
-rpciod_killall(void)
+static void rpciod_killall(void)
 {
        unsigned long flags;
 
        while (!list_empty(&all_tasks)) {
                clear_thread_flag(TIF_SIGPENDING);
                rpc_killall_tasks(NULL);
-               spin_lock_bh(&rpc_queue_lock);
-               __rpc_schedule();
-               spin_unlock_bh(&rpc_queue_lock);
+               flush_workqueue(rpciod_workqueue);
                if (!list_empty(&all_tasks)) {
                        dprintk("rpciod_killall: waiting for tasks to exit\n");
                        yield();
@@ -1193,54 +1000,57 @@ rpciod_killall(void)
 int
 rpciod_up(void)
 {
+       struct workqueue_struct *wq;
        int error = 0;
 
-       down(&rpciod_sema);
-       dprintk("rpciod_up: pid %d, users %d\n", rpciod_pid, rpciod_users);
+       mutex_lock(&rpciod_mutex);
+       dprintk("rpciod_up: users %d\n", rpciod_users);
        rpciod_users++;
-       if (rpciod_pid)
+       if (rpciod_workqueue)
                goto out;
        /*
         * If there's no pid, we should be the first user.
         */
        if (rpciod_users > 1)
-               printk(KERN_WARNING "rpciod_up: no pid, %d users??\n", rpciod_users);
+               printk(KERN_WARNING "rpciod_up: no workqueue, %d users??\n", rpciod_users);
        /*
         * Create the rpciod thread and wait for it to start.
         */
-       error = kernel_thread(rpciod, NULL, 0);
-       if (error < 0) {
-               printk(KERN_WARNING "rpciod_up: create thread failed, error=%d\n", error);
+       error = -ENOMEM;
+       wq = create_workqueue("rpciod");
+       if (wq == NULL) {
+               printk(KERN_WARNING "rpciod_up: create workqueue failed, error=%d\n", error);
                rpciod_users--;
                goto out;
        }
-       down(&rpciod_running);
+       rpciod_workqueue = wq;
        error = 0;
 out:
-       up(&rpciod_sema);
+       mutex_unlock(&rpciod_mutex);
        return error;
 }
 
 void
 rpciod_down(void)
 {
-       down(&rpciod_sema);
-       dprintk("rpciod_down pid %d sema %d\n", rpciod_pid, rpciod_users);
+       mutex_lock(&rpciod_mutex);
+       dprintk("rpciod_down sema %d\n", rpciod_users);
        if (rpciod_users) {
                if (--rpciod_users)
                        goto out;
        } else
-               printk(KERN_WARNING "rpciod_down: pid=%d, no users??\n", rpciod_pid);
+               printk(KERN_WARNING "rpciod_down: no users??\n");
 
-       if (!rpciod_pid) {
+       if (!rpciod_workqueue) {
                dprintk("rpciod_down: Nothing to do!\n");
                goto out;
        }
+       rpciod_killall();
 
-       kill_proc(rpciod_pid, SIGKILL, 1);
-       wait_for_completion(&rpciod_killer);
+       destroy_workqueue(rpciod_workqueue);
+       rpciod_workqueue = NULL;
  out:
-       up(&rpciod_sema);
+       mutex_unlock(&rpciod_mutex);
 }
 
 #ifdef RPC_DEBUG
@@ -1255,8 +1065,13 @@ void rpc_show_tasks(void)
                return;
        }
        printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
-               "-rpcwait -action- --exit--\n");
-       alltask_for_each(t, le, &all_tasks)
+               "-rpcwait -action- ---ops--\n");
+       alltask_for_each(t, le, &all_tasks) {
+               const char *rpc_waitq = "none";
+
+               if (RPC_IS_QUEUED(t))
+                       rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
+
                printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
                        t->tk_pid,
                        (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
@@ -1264,8 +1079,9 @@ void rpc_show_tasks(void)
                        t->tk_client,
                        (t->tk_client ? t->tk_client->cl_prog : 0),
                        t->tk_rqstp, t->tk_timeout,
-                       rpc_qname(t->tk_rpcwait),
-                       t->tk_action, t->tk_exit);
+                       rpc_waitq,
+                       t->tk_action, t->tk_ops);
+       }
        spin_unlock(&rpc_sched_lock);
 }
 #endif
@@ -1277,10 +1093,10 @@ rpc_destroy_mempool(void)
                mempool_destroy(rpc_buffer_mempool);
        if (rpc_task_mempool)
                mempool_destroy(rpc_task_mempool);
-       if (rpc_task_slabp && kmem_cache_destroy(rpc_task_slabp))
-               printk(KERN_INFO "rpc_task: not all structures were freed\n");
-       if (rpc_buffer_slabp && kmem_cache_destroy(rpc_buffer_slabp))
-               printk(KERN_INFO "rpc_buffers: not all structures were freed\n");
+       if (rpc_task_slabp)
+               kmem_cache_destroy(rpc_task_slabp);
+       if (rpc_buffer_slabp)
+               kmem_cache_destroy(rpc_buffer_slabp);
 }
 
 int
@@ -1298,16 +1114,12 @@ rpc_init_mempool(void)
                                             NULL, NULL);
        if (!rpc_buffer_slabp)
                goto err_nomem;
-       rpc_task_mempool = mempool_create(RPC_TASK_POOLSIZE,
-                                           mempool_alloc_slab,
-                                           mempool_free_slab,
-                                           rpc_task_slabp);
+       rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
+                                                   rpc_task_slabp);
        if (!rpc_task_mempool)
                goto err_nomem;
-       rpc_buffer_mempool = mempool_create(RPC_BUFFER_POOLSIZE,
-                                           mempool_alloc_slab,
-                                           mempool_free_slab,
-                                           rpc_buffer_slabp);
+       rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
+                                                     rpc_buffer_slabp);
        if (!rpc_buffer_mempool)
                goto err_nomem;
        return 0;