fedora core 6 1.2949 + vserver 2.2.0
[linux-2.6.git] / net / sunrpc / sched.c
index c06614d..fc083f0 100644 (file)
@@ -18,9 +18,9 @@
 #include <linux/smp.h>
 #include <linux/smp_lock.h>
 #include <linux/spinlock.h>
+#include <linux/mutex.h>
 
 #include <linux/sunrpc/clnt.h>
-#include <linux/sunrpc/xprt.h>
 
 #ifdef RPC_DEBUG
 #define RPCDBG_FACILITY                RPCDBG_SCHED
@@ -34,22 +34,15 @@ static int                  rpc_task_id;
 #define RPC_BUFFER_MAXSIZE     (2048)
 #define RPC_BUFFER_POOLSIZE    (8)
 #define RPC_TASK_POOLSIZE      (8)
-static kmem_cache_t    *rpc_task_slabp;
-static kmem_cache_t    *rpc_buffer_slabp;
-static mempool_t       *rpc_task_mempool;
-static mempool_t       *rpc_buffer_mempool;
+static struct kmem_cache       *rpc_task_slabp __read_mostly;
+static struct kmem_cache       *rpc_buffer_slabp __read_mostly;
+static mempool_t       *rpc_task_mempool __read_mostly;
+static mempool_t       *rpc_buffer_mempool __read_mostly;
 
 static void                    __rpc_default_timer(struct rpc_task *task);
 static void                    rpciod_killall(void);
-static void                    rpc_free(struct rpc_task *task);
-
-static void                    rpc_async_schedule(void *);
-
-/*
- * RPC tasks that create another task (e.g. for contacting the portmapper)
- * will wait on this queue for their child's completion
- */
-static RPC_WAITQ(childq, "childq");
+static void                    rpc_async_schedule(struct work_struct *);
+static void                     rpc_release_task(struct rpc_task *task);
 
 /*
  * RPC tasks sit here while waiting for conditions to improve.
@@ -64,9 +57,9 @@ static LIST_HEAD(all_tasks);
 /*
  * rpciod-related stuff
  */
-static DECLARE_MUTEX(rpciod_sema);
+static DEFINE_MUTEX(rpciod_mutex);
 static unsigned int            rpciod_users;
-static struct workqueue_struct *rpciod_workqueue;
+struct workqueue_struct *rpciod_workqueue;
 
 /*
  * Spinlock for other critical sections of code.
@@ -183,6 +176,7 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *
        else
                list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
        task->u.tk_wait.rpc_waitq = queue;
+       queue->qlen++;
        rpc_set_queued(task);
 
        dprintk("RPC: %4d added to queue %p \"%s\"\n",
@@ -217,6 +211,7 @@ static void __rpc_remove_wait_queue(struct rpc_task *task)
                __rpc_remove_wait_queue_priority(task);
        else
                list_del(&task->u.tk_wait.list);
+       queue->qlen--;
        dprintk("RPC: %4d removed from queue %p \"%s\"\n",
                                task->tk_pid, queue, rpc_qname(queue));
 }
@@ -264,6 +259,51 @@ void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
 }
 EXPORT_SYMBOL(rpc_init_wait_queue);
 
+static int rpc_wait_bit_interruptible(void *word)
+{
+       if (signal_pending(current))
+               return -ERESTARTSYS;
+       schedule();
+       return 0;
+}
+
+static void rpc_set_active(struct rpc_task *task)
+{
+       if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0)
+               return;
+       spin_lock(&rpc_sched_lock);
+#ifdef RPC_DEBUG
+       task->tk_magic = RPC_TASK_MAGIC_ID;
+       task->tk_pid = rpc_task_id++;
+#endif
+       /* Add to global list of all tasks */
+       list_add_tail(&task->tk_task, &all_tasks);
+       spin_unlock(&rpc_sched_lock);
+}
+
+/*
+ * Mark an RPC call as having completed by clearing the 'active' bit
+ */
+static void rpc_mark_complete_task(struct rpc_task *task)
+{
+       smp_mb__before_clear_bit();
+       clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
+       smp_mb__after_clear_bit();
+       wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
+}
+
+/*
+ * Allow callers to wait for completion of an RPC call
+ */
+int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
+{
+       if (action == NULL)
+               action = rpc_wait_bit_interruptible;
+       return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
+                       action, TASK_INTERRUPTIBLE);
+}
+EXPORT_SYMBOL(__rpc_wait_for_completion_task);
+
 /*
  * Make an RPC task runnable.
  *
@@ -272,17 +312,19 @@ EXPORT_SYMBOL(rpc_init_wait_queue);
  */
 static void rpc_make_runnable(struct rpc_task *task)
 {
-       int do_ret;
-
        BUG_ON(task->tk_timeout_fn);
-       do_ret = rpc_test_and_set_running(task);
        rpc_clear_queued(task);
-       if (do_ret)
+       if (rpc_test_and_set_running(task))
+               return;
+       /* We might have raced */
+       if (RPC_IS_QUEUED(task)) {
+               rpc_clear_running(task);
                return;
+       }
        if (RPC_IS_ASYNC(task)) {
                int status;
 
-               INIT_WORK(&task->u.tk_work, rpc_async_schedule, (void *)task);
+               INIT_WORK(&task->u.tk_work, rpc_async_schedule);
                status = queue_work(task->tk_workqueue, &task->u.tk_work);
                if (status < 0) {
                        printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
@@ -290,20 +332,7 @@ static void rpc_make_runnable(struct rpc_task *task)
                        return;
                }
        } else
-               wake_up(&task->u.tk_wait.waitq);
-}
-
-/*
- * Place a newly initialized task on the workqueue.
- */
-static inline void
-rpc_schedule_run(struct rpc_task *task)
-{
-       /* Don't run a child twice! */
-       if (RPC_IS_ACTIVATED(task))
-               return;
-       task->tk_active = 1;
-       rpc_make_runnable(task);
+               wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
 }
 
 /*
@@ -323,10 +352,6 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
                return;
        }
 
-       /* Mark the task as being activated if so needed */
-       if (!RPC_IS_ACTIVATED(task))
-               task->tk_active = 1;
-
        __rpc_add_wait_queue(q, task);
 
        BUG_ON(task->tk_callback != NULL);
@@ -337,6 +362,9 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
                                rpc_action action, rpc_action timer)
 {
+       /* Mark the task as being activated if so needed */
+       rpc_set_active(task);
+
        /*
         * Protect the queue operations.
         */
@@ -400,16 +428,19 @@ __rpc_default_timer(struct rpc_task *task)
  */
 void rpc_wake_up_task(struct rpc_task *task)
 {
+       rcu_read_lock_bh();
        if (rpc_start_wakeup(task)) {
                if (RPC_IS_QUEUED(task)) {
                        struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq;
 
-                       spin_lock_bh(&queue->lock);
+                       /* Note: we're already in a bh-safe context */
+                       spin_lock(&queue->lock);
                        __rpc_do_wake_up_task(task);
-                       spin_unlock_bh(&queue->lock);
+                       spin_unlock(&queue->lock);
                }
                rpc_finish_wakeup(task);
        }
+       rcu_read_unlock_bh();
 }
 
 /*
@@ -472,14 +503,16 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
        struct rpc_task *task = NULL;
 
        dprintk("RPC:      wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
-       spin_lock_bh(&queue->lock);
+       rcu_read_lock_bh();
+       spin_lock(&queue->lock);
        if (RPC_IS_PRIORITY(queue))
                task = __rpc_wake_up_next_priority(queue);
        else {
                task_for_first(task, &queue->tasks[0])
                        __rpc_wake_up_task(task);
        }
-       spin_unlock_bh(&queue->lock);
+       spin_unlock(&queue->lock);
+       rcu_read_unlock_bh();
 
        return task;
 }
@@ -492,21 +525,21 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
  */
 void rpc_wake_up(struct rpc_wait_queue *queue)
 {
-       struct rpc_task *task;
-
+       struct rpc_task *task, *next;
        struct list_head *head;
-       spin_lock_bh(&queue->lock);
+
+       rcu_read_lock_bh();
+       spin_lock(&queue->lock);
        head = &queue->tasks[queue->maxpriority];
        for (;;) {
-               while (!list_empty(head)) {
-                       task = list_entry(head->next, struct rpc_task, u.tk_wait.list);
+               list_for_each_entry_safe(task, next, head, u.tk_wait.list)
                        __rpc_wake_up_task(task);
-               }
                if (head == &queue->tasks[0])
                        break;
                head--;
        }
-       spin_unlock_bh(&queue->lock);
+       spin_unlock(&queue->lock);
+       rcu_read_unlock_bh();
 }
 
 /**
@@ -518,14 +551,14 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
  */
 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
 {
+       struct rpc_task *task, *next;
        struct list_head *head;
-       struct rpc_task *task;
 
-       spin_lock_bh(&queue->lock);
+       rcu_read_lock_bh();
+       spin_lock(&queue->lock);
        head = &queue->tasks[queue->maxpriority];
        for (;;) {
-               while (!list_empty(head)) {
-                       task = list_entry(head->next, struct rpc_task, u.tk_wait.list);
+               list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
                        task->tk_status = status;
                        __rpc_wake_up_task(task);
                }
@@ -533,25 +566,60 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
                        break;
                head--;
        }
-       spin_unlock_bh(&queue->lock);
+       spin_unlock(&queue->lock);
+       rcu_read_unlock_bh();
+}
+
+static void __rpc_atrun(struct rpc_task *task)
+{
+       rpc_wake_up_task(task);
 }
 
 /*
  * Run a task at a later time
  */
-static void    __rpc_atrun(struct rpc_task *);
-void
-rpc_delay(struct rpc_task *task, unsigned long delay)
+void rpc_delay(struct rpc_task *task, unsigned long delay)
 {
        task->tk_timeout = delay;
        rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
 }
 
-static void
-__rpc_atrun(struct rpc_task *task)
+/*
+ * Helper to call task->tk_ops->rpc_call_prepare
+ */
+static void rpc_prepare_task(struct rpc_task *task)
 {
-       task->tk_status = 0;
-       rpc_wake_up_task(task);
+       lock_kernel();
+       task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
+       unlock_kernel();
+}
+
+/*
+ * Helper that calls task->tk_ops->rpc_call_done if it exists
+ */
+void rpc_exit_task(struct rpc_task *task)
+{
+       task->tk_action = NULL;
+       if (task->tk_ops->rpc_call_done != NULL) {
+               lock_kernel();
+               task->tk_ops->rpc_call_done(task, task->tk_calldata);
+               unlock_kernel();
+               if (task->tk_action != NULL) {
+                       WARN_ON(RPC_ASSASSINATED(task));
+                       /* Always release the RPC slot and buffer memory */
+                       xprt_release(task);
+               }
+       }
+}
+EXPORT_SYMBOL(rpc_exit_task);
+
+void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
+{
+       if (ops->rpc_release != NULL) {
+               lock_kernel();
+               ops->rpc_release(calldata);
+               unlock_kernel();
+       }
 }
 
 /*
@@ -566,8 +634,7 @@ static int __rpc_execute(struct rpc_task *task)
 
        BUG_ON(RPC_IS_QUEUED(task));
 
- restarted:
-       while (1) {
+       for (;;) {
                /*
                 * Garbage collection of pending timers...
                 */
@@ -589,9 +656,7 @@ static int __rpc_execute(struct rpc_task *task)
                         */
                        save_callback=task->tk_callback;
                        task->tk_callback=NULL;
-                       lock_kernel();
                        save_callback(task);
-                       unlock_kernel();
                }
 
                /*
@@ -600,11 +665,9 @@ static int __rpc_execute(struct rpc_task *task)
                 * by someone else.
                 */
                if (!RPC_IS_QUEUED(task)) {
-                       if (!task->tk_action)
+                       if (task->tk_action == NULL)
                                break;
-                       lock_kernel();
                        task->tk_action(task);
-                       unlock_kernel();
                }
 
                /*
@@ -624,47 +687,27 @@ static int __rpc_execute(struct rpc_task *task)
 
                /* sync task: sleep here */
                dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid);
-               if (RPC_TASK_UNINTERRUPTIBLE(task)) {
-                       __wait_event(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task));
-               } else {
-                       __wait_event_interruptible(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task), status);
+               /* Note: Caller should be using rpc_clnt_sigmask() */
+               status = out_of_line_wait_on_bit(&task->tk_runstate,
+                               RPC_TASK_QUEUED, rpc_wait_bit_interruptible,
+                               TASK_INTERRUPTIBLE);
+               if (status == -ERESTARTSYS) {
                        /*
                         * When a sync task receives a signal, it exits with
                         * -ERESTARTSYS. In order to catch any callbacks that
                         * clean up after sleeping on some queue, we don't
                         * break the loop here, but go around once more.
                         */
-                       if (status == -ERESTARTSYS) {
-                               dprintk("RPC: %4d got signal\n", task->tk_pid);
-                               task->tk_flags |= RPC_TASK_KILLED;
-                               rpc_exit(task, -ERESTARTSYS);
-                               rpc_wake_up_task(task);
-                       }
+                       dprintk("RPC: %4d got signal\n", task->tk_pid);
+                       task->tk_flags |= RPC_TASK_KILLED;
+                       rpc_exit(task, -ERESTARTSYS);
+                       rpc_wake_up_task(task);
                }
                rpc_set_running(task);
                dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
        }
 
-       if (task->tk_exit) {
-               lock_kernel();
-               task->tk_exit(task);
-               unlock_kernel();
-               /* If tk_action is non-null, the user wants us to restart */
-               if (task->tk_action) {
-                       if (!RPC_ASSASSINATED(task)) {
-                               /* Release RPC slot and buffer memory */
-                               if (task->tk_rqstp)
-                                       xprt_release(task);
-                               rpc_free(task);
-                               goto restarted;
-                       }
-                       printk(KERN_ERR "RPC: dead task tries to walk away.\n");
-               }
-       }
-
-       dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
-       status = task->tk_status;
-
+       dprintk("RPC: %4d, return %d, status %d\n", task->tk_pid, status, task->tk_status);
        /* Release all resources associated with the task */
        rpc_release_task(task);
        return status;
@@ -682,30 +725,30 @@ static int __rpc_execute(struct rpc_task *task)
 int
 rpc_execute(struct rpc_task *task)
 {
-       BUG_ON(task->tk_active);
-
-       task->tk_active = 1;
+       rpc_set_active(task);
        rpc_set_running(task);
        return __rpc_execute(task);
 }
 
-static void rpc_async_schedule(void *arg)
+static void rpc_async_schedule(struct work_struct *work)
 {
-       __rpc_execute((struct rpc_task *)arg);
+       __rpc_execute(container_of(work, struct rpc_task, u.tk_work));
 }
 
-/*
- * Allocate memory for RPC purposes.
+/**
+ * rpc_malloc - allocate an RPC buffer
+ * @task: RPC task that will use this buffer
+ * @size: requested byte size
  *
  * We try to ensure that some NFS reads and writes can always proceed
  * by using a mempool when allocating 'small' buffers.
  * In order to avoid memory starvation triggering more writebacks of
  * NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
  */
-void *
-rpc_malloc(struct rpc_task *task, size_t size)
+void * rpc_malloc(struct rpc_task *task, size_t size)
 {
-       int     gfp;
+       struct rpc_rqst *req = task->tk_rqstp;
+       gfp_t   gfp;
 
        if (task->tk_flags & RPC_TASK_SWAPPER)
                gfp = GFP_ATOMIC;
@@ -713,42 +756,52 @@ rpc_malloc(struct rpc_task *task, size_t size)
                gfp = GFP_NOFS;
 
        if (size > RPC_BUFFER_MAXSIZE) {
-               task->tk_buffer =  kmalloc(size, gfp);
-               if (task->tk_buffer)
-                       task->tk_bufsize = size;
+               req->rq_buffer = kmalloc(size, gfp);
+               if (req->rq_buffer)
+                       req->rq_bufsize = size;
        } else {
-               task->tk_buffer =  mempool_alloc(rpc_buffer_mempool, gfp);
-               if (task->tk_buffer)
-                       task->tk_bufsize = RPC_BUFFER_MAXSIZE;
+               req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
+               if (req->rq_buffer)
+                       req->rq_bufsize = RPC_BUFFER_MAXSIZE;
        }
-       return task->tk_buffer;
+       return req->rq_buffer;
 }
 
-static void
-rpc_free(struct rpc_task *task)
+/**
+ * rpc_free - free buffer allocated via rpc_malloc
+ * @task: RPC task with a buffer to be freed
+ *
+ */
+void rpc_free(struct rpc_task *task)
 {
-       if (task->tk_buffer) {
-               if (task->tk_bufsize == RPC_BUFFER_MAXSIZE)
-                       mempool_free(task->tk_buffer, rpc_buffer_mempool);
+       struct rpc_rqst *req = task->tk_rqstp;
+
+       if (req->rq_buffer) {
+               if (req->rq_bufsize == RPC_BUFFER_MAXSIZE)
+                       mempool_free(req->rq_buffer, rpc_buffer_mempool);
                else
-                       kfree(task->tk_buffer);
-               task->tk_buffer = NULL;
-               task->tk_bufsize = 0;
+                       kfree(req->rq_buffer);
+               req->rq_buffer = NULL;
+               req->rq_bufsize = 0;
        }
 }
 
 /*
  * Creation and deletion of RPC task structures
  */
-void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action callback, int flags)
+void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
 {
        memset(task, 0, sizeof(*task));
        init_timer(&task->tk_timer);
        task->tk_timer.data     = (unsigned long) task;
        task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
+       atomic_set(&task->tk_count, 1);
        task->tk_client = clnt;
        task->tk_flags  = flags;
-       task->tk_exit   = callback;
+       task->tk_ops = tk_ops;
+       if (tk_ops->rpc_call_prepare != NULL)
+               task->tk_action = rpc_prepare_task;
+       task->tk_calldata = calldata;
 
        /* Initialize retry counters */
        task->tk_garb_retry = 2;
@@ -759,8 +812,6 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call
 
        /* Initialize workqueue for async tasks */
        task->tk_workqueue = rpciod_workqueue;
-       if (!RPC_IS_ASYNC(task))
-               init_waitqueue_head(&task->u.tk_wait.waitq);
 
        if (clnt) {
                atomic_inc(&clnt->cl_users);
@@ -770,14 +821,10 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call
                        task->tk_flags |= RPC_TASK_NOINTR;
        }
 
-#ifdef RPC_DEBUG
-       task->tk_magic = RPC_TASK_MAGIC_ID;
-       task->tk_pid = rpc_task_id++;
-#endif
-       /* Add to global list of all tasks */
-       spin_lock(&rpc_sched_lock);
-       list_add_tail(&task->tk_task, &all_tasks);
-       spin_unlock(&rpc_sched_lock);
+       BUG_ON(task->tk_ops == NULL);
+
+       /* starting timestamp */
+       task->tk_start = jiffies;
 
        dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
                                current->pid);
@@ -789,9 +836,9 @@ rpc_alloc_task(void)
        return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
 }
 
-static void
-rpc_default_free_task(struct rpc_task *task)
+static void rpc_free_task(struct rcu_head *rcu)
 {
+       struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu);
        dprintk("RPC: %4d freeing task\n", task->tk_pid);
        mempool_free(task, rpc_task_mempool);
 }
@@ -801,8 +848,7 @@ rpc_default_free_task(struct rpc_task *task)
  * clean up after an allocation failure, as the client may
  * have specified "oneshot".
  */
-struct rpc_task *
-rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
+struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
 {
        struct rpc_task *task;
 
@@ -810,10 +856,7 @@ rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
        if (!task)
                goto cleanup;
 
-       rpc_init_task(task, clnt, callback, flags);
-
-       /* Replace tk_release */
-       task->tk_release = rpc_default_free_task;
+       rpc_init_task(task, clnt, flags, tk_ops, calldata);
 
        dprintk("RPC: %4d allocated task\n", task->tk_pid);
        task->tk_flags |= RPC_TASK_DYNAMIC;
@@ -831,13 +874,35 @@ cleanup:
        goto out;
 }
 
-void rpc_release_task(struct rpc_task *task)
+
+void rpc_put_task(struct rpc_task *task)
 {
-       dprintk("RPC: %4d release task\n", task->tk_pid);
+       const struct rpc_call_ops *tk_ops = task->tk_ops;
+       void *calldata = task->tk_calldata;
 
+       if (!atomic_dec_and_test(&task->tk_count))
+               return;
+       /* Release resources */
+       if (task->tk_rqstp)
+               xprt_release(task);
+       if (task->tk_msg.rpc_cred)
+               rpcauth_unbindcred(task);
+       if (task->tk_client) {
+               rpc_release_client(task->tk_client);
+               task->tk_client = NULL;
+       }
+       if (task->tk_flags & RPC_TASK_DYNAMIC)
+               call_rcu_bh(&task->u.tk_rcu, rpc_free_task);
+       rpc_release_calldata(tk_ops, calldata);
+}
+EXPORT_SYMBOL(rpc_put_task);
+
+static void rpc_release_task(struct rpc_task *task)
+{
 #ifdef RPC_DEBUG
        BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
 #endif
+       dprintk("RPC: %4d release task\n", task->tk_pid);
 
        /* Remove from global task list */
        spin_lock(&rpc_sched_lock);
@@ -845,92 +910,41 @@ void rpc_release_task(struct rpc_task *task)
        spin_unlock(&rpc_sched_lock);
 
        BUG_ON (RPC_IS_QUEUED(task));
-       task->tk_active = 0;
 
        /* Synchronously delete any running timer */
        rpc_delete_timer(task);
 
-       /* Release resources */
-       if (task->tk_rqstp)
-               xprt_release(task);
-       if (task->tk_msg.rpc_cred)
-               rpcauth_unbindcred(task);
-       rpc_free(task);
-       if (task->tk_client) {
-               rpc_release_client(task->tk_client);
-               task->tk_client = NULL;
-       }
-
 #ifdef RPC_DEBUG
        task->tk_magic = 0;
 #endif
-       if (task->tk_release)
-               task->tk_release(task);
+       /* Wake up anyone who is waiting for task completion */
+       rpc_mark_complete_task(task);
+
+       rpc_put_task(task);
 }
 
 /**
- * rpc_find_parent - find the parent of a child task.
- * @child: child task
- *
- * Checks that the parent task is still sleeping on the
- * queue 'childq'. If so returns a pointer to the parent.
- * Upon failure returns NULL.
- *
- * Caller must hold childq.lock
+ * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
+ * @clnt: pointer to RPC client
+ * @flags: RPC flags
+ * @ops: RPC call ops
+ * @data: user call data
  */
-static inline struct rpc_task *rpc_find_parent(struct rpc_task *child)
-{
-       struct rpc_task *task, *parent;
-       struct list_head *le;
-
-       parent = (struct rpc_task *) child->tk_calldata;
-       task_for_each(task, le, &childq.tasks[0])
-               if (task == parent)
-                       return parent;
-
-       return NULL;
-}
-
-static void rpc_child_exit(struct rpc_task *child)
+struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
+                                       const struct rpc_call_ops *ops,
+                                       void *data)
 {
-       struct rpc_task *parent;
-
-       spin_lock_bh(&childq.lock);
-       if ((parent = rpc_find_parent(child)) != NULL) {
-               parent->tk_status = child->tk_status;
-               __rpc_wake_up_task(parent);
+       struct rpc_task *task;
+       task = rpc_new_task(clnt, flags, ops, data);
+       if (task == NULL) {
+               rpc_release_calldata(ops, data);
+               return ERR_PTR(-ENOMEM);
        }
-       spin_unlock_bh(&childq.lock);
-}
-
-/*
- * Note: rpc_new_task releases the client after a failure.
- */
-struct rpc_task *
-rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent)
-{
-       struct rpc_task *task;
-
-       task = rpc_new_task(clnt, NULL, RPC_TASK_ASYNC | RPC_TASK_CHILD);
-       if (!task)
-               goto fail;
-       task->tk_exit = rpc_child_exit;
-       task->tk_calldata = parent;
+       atomic_inc(&task->tk_count);
+       rpc_execute(task);
        return task;
-
-fail:
-       parent->tk_status = -ENOMEM;
-       return NULL;
-}
-
-void rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
-{
-       spin_lock_bh(&childq.lock);
-       /* N.B. Is it possible for the child to have already finished? */
-       __rpc_sleep_on(&childq, task, func, NULL);
-       rpc_schedule_run(child);
-       spin_unlock_bh(&childq.lock);
 }
+EXPORT_SYMBOL(rpc_run_task);
 
 /*
  * Kill all tasks for the given client.
@@ -989,7 +1003,7 @@ rpciod_up(void)
        struct workqueue_struct *wq;
        int error = 0;
 
-       down(&rpciod_sema);
+       mutex_lock(&rpciod_mutex);
        dprintk("rpciod_up: users %d\n", rpciod_users);
        rpciod_users++;
        if (rpciod_workqueue)
@@ -1012,14 +1026,14 @@ rpciod_up(void)
        rpciod_workqueue = wq;
        error = 0;
 out:
-       up(&rpciod_sema);
+       mutex_unlock(&rpciod_mutex);
        return error;
 }
 
 void
 rpciod_down(void)
 {
-       down(&rpciod_sema);
+       mutex_lock(&rpciod_mutex);
        dprintk("rpciod_down sema %d\n", rpciod_users);
        if (rpciod_users) {
                if (--rpciod_users)
@@ -1036,7 +1050,7 @@ rpciod_down(void)
        destroy_workqueue(rpciod_workqueue);
        rpciod_workqueue = NULL;
  out:
-       up(&rpciod_sema);
+       mutex_unlock(&rpciod_mutex);
 }
 
 #ifdef RPC_DEBUG
@@ -1051,7 +1065,7 @@ void rpc_show_tasks(void)
                return;
        }
        printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
-               "-rpcwait -action- --exit--\n");
+               "-rpcwait -action- ---ops--\n");
        alltask_for_each(t, le, &all_tasks) {
                const char *rpc_waitq = "none";
 
@@ -1066,7 +1080,7 @@ void rpc_show_tasks(void)
                        (t->tk_client ? t->tk_client->cl_prog : 0),
                        t->tk_rqstp, t->tk_timeout,
                        rpc_waitq,
-                       t->tk_action, t->tk_exit);
+                       t->tk_action, t->tk_ops);
        }
        spin_unlock(&rpc_sched_lock);
 }
@@ -1079,10 +1093,10 @@ rpc_destroy_mempool(void)
                mempool_destroy(rpc_buffer_mempool);
        if (rpc_task_mempool)
                mempool_destroy(rpc_task_mempool);
-       if (rpc_task_slabp && kmem_cache_destroy(rpc_task_slabp))
-               printk(KERN_INFO "rpc_task: not all structures were freed\n");
-       if (rpc_buffer_slabp && kmem_cache_destroy(rpc_buffer_slabp))
-               printk(KERN_INFO "rpc_buffers: not all structures were freed\n");
+       if (rpc_task_slabp)
+               kmem_cache_destroy(rpc_task_slabp);
+       if (rpc_buffer_slabp)
+               kmem_cache_destroy(rpc_buffer_slabp);
 }
 
 int
@@ -1100,16 +1114,12 @@ rpc_init_mempool(void)
                                             NULL, NULL);
        if (!rpc_buffer_slabp)
                goto err_nomem;
-       rpc_task_mempool = mempool_create(RPC_TASK_POOLSIZE,
-                                           mempool_alloc_slab,
-                                           mempool_free_slab,
-                                           rpc_task_slabp);
+       rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
+                                                   rpc_task_slabp);
        if (!rpc_task_mempool)
                goto err_nomem;
-       rpc_buffer_mempool = mempool_create(RPC_BUFFER_POOLSIZE,
-                                           mempool_alloc_slab,
-                                           mempool_free_slab,
-                                           rpc_buffer_slabp);
+       rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
+                                                     rpc_buffer_slabp);
        if (!rpc_buffer_mempool)
                goto err_nomem;
        return 0;