Fedora kernel-2.6.17-1.2142_FC4 patched with stable patch-2.6.17.4-vs2.0.2-rc26.diff
[linux-2.6.git] / net / sunrpc / sched.c
index c06614d..5c3eee7 100644 (file)
@@ -18,6 +18,7 @@
 #include <linux/smp.h>
 #include <linux/smp_lock.h>
 #include <linux/spinlock.h>
+#include <linux/mutex.h>
 
 #include <linux/sunrpc/clnt.h>
 #include <linux/sunrpc/xprt.h>
@@ -34,15 +35,13 @@ static int                  rpc_task_id;
 #define RPC_BUFFER_MAXSIZE     (2048)
 #define RPC_BUFFER_POOLSIZE    (8)
 #define RPC_TASK_POOLSIZE      (8)
-static kmem_cache_t    *rpc_task_slabp;
-static kmem_cache_t    *rpc_buffer_slabp;
-static mempool_t       *rpc_task_mempool;
-static mempool_t       *rpc_buffer_mempool;
+static kmem_cache_t    *rpc_task_slabp __read_mostly;
+static kmem_cache_t    *rpc_buffer_slabp __read_mostly;
+static mempool_t       *rpc_task_mempool __read_mostly;
+static mempool_t       *rpc_buffer_mempool __read_mostly;
 
 static void                    __rpc_default_timer(struct rpc_task *task);
 static void                    rpciod_killall(void);
-static void                    rpc_free(struct rpc_task *task);
-
 static void                    rpc_async_schedule(void *);
 
 /*
@@ -64,9 +63,9 @@ static LIST_HEAD(all_tasks);
 /*
  * rpciod-related stuff
  */
-static DECLARE_MUTEX(rpciod_sema);
+static DEFINE_MUTEX(rpciod_mutex);
 static unsigned int            rpciod_users;
-static struct workqueue_struct *rpciod_workqueue;
+struct workqueue_struct *rpciod_workqueue;
 
 /*
  * Spinlock for other critical sections of code.
@@ -183,6 +182,7 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *
        else
                list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
        task->u.tk_wait.rpc_waitq = queue;
+       queue->qlen++;
        rpc_set_queued(task);
 
        dprintk("RPC: %4d added to queue %p \"%s\"\n",
@@ -217,6 +217,7 @@ static void __rpc_remove_wait_queue(struct rpc_task *task)
                __rpc_remove_wait_queue_priority(task);
        else
                list_del(&task->u.tk_wait.list);
+       queue->qlen--;
        dprintk("RPC: %4d removed from queue %p \"%s\"\n",
                                task->tk_pid, queue, rpc_qname(queue));
 }
@@ -264,6 +265,35 @@ void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
 }
 EXPORT_SYMBOL(rpc_init_wait_queue);
 
+static int rpc_wait_bit_interruptible(void *word)
+{
+       if (signal_pending(current))
+               return -ERESTARTSYS;
+       schedule();
+       return 0;
+}
+
+/*
+ * Mark an RPC call as having completed by clearing the 'active' bit
+ */
+static inline void rpc_mark_complete_task(struct rpc_task *task)
+{
+       rpc_clear_active(task);
+       wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
+}
+
+/*
+ * Allow callers to wait for completion of an RPC call
+ */
+int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
+{
+       if (action == NULL)
+               action = rpc_wait_bit_interruptible;
+       return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
+                       action, TASK_INTERRUPTIBLE);
+}
+EXPORT_SYMBOL(__rpc_wait_for_completion_task);
+
 /*
  * Make an RPC task runnable.
  *
@@ -290,7 +320,7 @@ static void rpc_make_runnable(struct rpc_task *task)
                        return;
                }
        } else
-               wake_up(&task->u.tk_wait.waitq);
+               wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
 }
 
 /*
@@ -299,10 +329,7 @@ static void rpc_make_runnable(struct rpc_task *task)
 static inline void
 rpc_schedule_run(struct rpc_task *task)
 {
-       /* Don't run a child twice! */
-       if (RPC_IS_ACTIVATED(task))
-               return;
-       task->tk_active = 1;
+       rpc_set_active(task);
        rpc_make_runnable(task);
 }
 
@@ -324,8 +351,7 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
        }
 
        /* Mark the task as being activated if so needed */
-       if (!RPC_IS_ACTIVATED(task))
-               task->tk_active = 1;
+       rpc_set_active(task);
 
        __rpc_add_wait_queue(q, task);
 
@@ -492,16 +518,14 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
  */
 void rpc_wake_up(struct rpc_wait_queue *queue)
 {
-       struct rpc_task *task;
-
+       struct rpc_task *task, *next;
        struct list_head *head;
+
        spin_lock_bh(&queue->lock);
        head = &queue->tasks[queue->maxpriority];
        for (;;) {
-               while (!list_empty(head)) {
-                       task = list_entry(head->next, struct rpc_task, u.tk_wait.list);
+               list_for_each_entry_safe(task, next, head, u.tk_wait.list)
                        __rpc_wake_up_task(task);
-               }
                if (head == &queue->tasks[0])
                        break;
                head--;
@@ -518,14 +542,13 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
  */
 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
 {
+       struct rpc_task *task, *next;
        struct list_head *head;
-       struct rpc_task *task;
 
        spin_lock_bh(&queue->lock);
        head = &queue->tasks[queue->maxpriority];
        for (;;) {
-               while (!list_empty(head)) {
-                       task = list_entry(head->next, struct rpc_task, u.tk_wait.list);
+               list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
                        task->tk_status = status;
                        __rpc_wake_up_task(task);
                }
@@ -554,6 +577,31 @@ __rpc_atrun(struct rpc_task *task)
        rpc_wake_up_task(task);
 }
 
+/*
+ * Helper to call task->tk_ops->rpc_call_prepare
+ */
+static void rpc_prepare_task(struct rpc_task *task)
+{
+       task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
+}
+
+/*
+ * Helper that calls task->tk_ops->rpc_call_done if it exists
+ */
+void rpc_exit_task(struct rpc_task *task)
+{
+       task->tk_action = NULL;
+       if (task->tk_ops->rpc_call_done != NULL) {
+               task->tk_ops->rpc_call_done(task, task->tk_calldata);
+               if (task->tk_action != NULL) {
+                       WARN_ON(RPC_ASSASSINATED(task));
+                       /* Always release the RPC slot and buffer memory */
+                       xprt_release(task);
+               }
+       }
+}
+EXPORT_SYMBOL(rpc_exit_task);
+
 /*
  * This is the RPC `scheduler' (or rather, the finite state machine).
  */
@@ -566,8 +614,7 @@ static int __rpc_execute(struct rpc_task *task)
 
        BUG_ON(RPC_IS_QUEUED(task));
 
- restarted:
-       while (1) {
+       for (;;) {
                /*
                 * Garbage collection of pending timers...
                 */
@@ -600,7 +647,7 @@ static int __rpc_execute(struct rpc_task *task)
                 * by someone else.
                 */
                if (!RPC_IS_QUEUED(task)) {
-                       if (!task->tk_action)
+                       if (task->tk_action == NULL)
                                break;
                        lock_kernel();
                        task->tk_action(task);
@@ -624,47 +671,29 @@ static int __rpc_execute(struct rpc_task *task)
 
                /* sync task: sleep here */
                dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid);
-               if (RPC_TASK_UNINTERRUPTIBLE(task)) {
-                       __wait_event(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task));
-               } else {
-                       __wait_event_interruptible(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task), status);
+               /* Note: Caller should be using rpc_clnt_sigmask() */
+               status = out_of_line_wait_on_bit(&task->tk_runstate,
+                               RPC_TASK_QUEUED, rpc_wait_bit_interruptible,
+                               TASK_INTERRUPTIBLE);
+               if (status == -ERESTARTSYS) {
                        /*
                         * When a sync task receives a signal, it exits with
                         * -ERESTARTSYS. In order to catch any callbacks that
                         * clean up after sleeping on some queue, we don't
                         * break the loop here, but go around once more.
                         */
-                       if (status == -ERESTARTSYS) {
-                               dprintk("RPC: %4d got signal\n", task->tk_pid);
-                               task->tk_flags |= RPC_TASK_KILLED;
-                               rpc_exit(task, -ERESTARTSYS);
-                               rpc_wake_up_task(task);
-                       }
+                       dprintk("RPC: %4d got signal\n", task->tk_pid);
+                       task->tk_flags |= RPC_TASK_KILLED;
+                       rpc_exit(task, -ERESTARTSYS);
+                       rpc_wake_up_task(task);
                }
                rpc_set_running(task);
                dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
        }
 
-       if (task->tk_exit) {
-               lock_kernel();
-               task->tk_exit(task);
-               unlock_kernel();
-               /* If tk_action is non-null, the user wants us to restart */
-               if (task->tk_action) {
-                       if (!RPC_ASSASSINATED(task)) {
-                               /* Release RPC slot and buffer memory */
-                               if (task->tk_rqstp)
-                                       xprt_release(task);
-                               rpc_free(task);
-                               goto restarted;
-                       }
-                       printk(KERN_ERR "RPC: dead task tries to walk away.\n");
-               }
-       }
-
-       dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
-       status = task->tk_status;
-
+       dprintk("RPC: %4d, return %d, status %d\n", task->tk_pid, status, task->tk_status);
+       /* Wake up anyone who is waiting for task completion */
+       rpc_mark_complete_task(task);
        /* Release all resources associated with the task */
        rpc_release_task(task);
        return status;
@@ -682,9 +711,7 @@ static int __rpc_execute(struct rpc_task *task)
 int
 rpc_execute(struct rpc_task *task)
 {
-       BUG_ON(task->tk_active);
-
-       task->tk_active = 1;
+       rpc_set_active(task);
        rpc_set_running(task);
        return __rpc_execute(task);
 }
@@ -694,18 +721,20 @@ static void rpc_async_schedule(void *arg)
        __rpc_execute((struct rpc_task *)arg);
 }
 
-/*
- * Allocate memory for RPC purposes.
+/**
+ * rpc_malloc - allocate an RPC buffer
+ * @task: RPC task that will use this buffer
+ * @size: requested byte size
  *
  * We try to ensure that some NFS reads and writes can always proceed
  * by using a mempool when allocating 'small' buffers.
  * In order to avoid memory starvation triggering more writebacks of
  * NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
  */
-void *
-rpc_malloc(struct rpc_task *task, size_t size)
+void * rpc_malloc(struct rpc_task *task, size_t size)
 {
-       int     gfp;
+       struct rpc_rqst *req = task->tk_rqstp;
+       gfp_t   gfp;
 
        if (task->tk_flags & RPC_TASK_SWAPPER)
                gfp = GFP_ATOMIC;
@@ -713,42 +742,52 @@ rpc_malloc(struct rpc_task *task, size_t size)
                gfp = GFP_NOFS;
 
        if (size > RPC_BUFFER_MAXSIZE) {
-               task->tk_buffer =  kmalloc(size, gfp);
-               if (task->tk_buffer)
-                       task->tk_bufsize = size;
+               req->rq_buffer = kmalloc(size, gfp);
+               if (req->rq_buffer)
+                       req->rq_bufsize = size;
        } else {
-               task->tk_buffer =  mempool_alloc(rpc_buffer_mempool, gfp);
-               if (task->tk_buffer)
-                       task->tk_bufsize = RPC_BUFFER_MAXSIZE;
+               req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
+               if (req->rq_buffer)
+                       req->rq_bufsize = RPC_BUFFER_MAXSIZE;
        }
-       return task->tk_buffer;
+       return req->rq_buffer;
 }
 
-static void
-rpc_free(struct rpc_task *task)
+/**
+ * rpc_free - free buffer allocated via rpc_malloc
+ * @task: RPC task with a buffer to be freed
+ *
+ */
+void rpc_free(struct rpc_task *task)
 {
-       if (task->tk_buffer) {
-               if (task->tk_bufsize == RPC_BUFFER_MAXSIZE)
-                       mempool_free(task->tk_buffer, rpc_buffer_mempool);
+       struct rpc_rqst *req = task->tk_rqstp;
+
+       if (req->rq_buffer) {
+               if (req->rq_bufsize == RPC_BUFFER_MAXSIZE)
+                       mempool_free(req->rq_buffer, rpc_buffer_mempool);
                else
-                       kfree(task->tk_buffer);
-               task->tk_buffer = NULL;
-               task->tk_bufsize = 0;
+                       kfree(req->rq_buffer);
+               req->rq_buffer = NULL;
+               req->rq_bufsize = 0;
        }
 }
 
 /*
  * Creation and deletion of RPC task structures
  */
-void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action callback, int flags)
+void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
 {
        memset(task, 0, sizeof(*task));
        init_timer(&task->tk_timer);
        task->tk_timer.data     = (unsigned long) task;
        task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
+       atomic_set(&task->tk_count, 1);
        task->tk_client = clnt;
        task->tk_flags  = flags;
-       task->tk_exit   = callback;
+       task->tk_ops = tk_ops;
+       if (tk_ops->rpc_call_prepare != NULL)
+               task->tk_action = rpc_prepare_task;
+       task->tk_calldata = calldata;
 
        /* Initialize retry counters */
        task->tk_garb_retry = 2;
@@ -759,8 +798,6 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call
 
        /* Initialize workqueue for async tasks */
        task->tk_workqueue = rpciod_workqueue;
-       if (!RPC_IS_ASYNC(task))
-               init_waitqueue_head(&task->u.tk_wait.waitq);
 
        if (clnt) {
                atomic_inc(&clnt->cl_users);
@@ -779,6 +816,11 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call
        list_add_tail(&task->tk_task, &all_tasks);
        spin_unlock(&rpc_sched_lock);
 
+       BUG_ON(task->tk_ops == NULL);
+
+       /* starting timestamp */
+       task->tk_start = jiffies;
+
        dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
                                current->pid);
 }
@@ -789,8 +831,7 @@ rpc_alloc_task(void)
        return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
 }
 
-static void
-rpc_default_free_task(struct rpc_task *task)
+static void rpc_free_task(struct rpc_task *task)
 {
        dprintk("RPC: %4d freeing task\n", task->tk_pid);
        mempool_free(task, rpc_task_mempool);
@@ -801,8 +842,7 @@ rpc_default_free_task(struct rpc_task *task)
  * clean up after an allocation failure, as the client may
  * have specified "oneshot".
  */
-struct rpc_task *
-rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
+struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata)
 {
        struct rpc_task *task;
 
@@ -810,10 +850,7 @@ rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
        if (!task)
                goto cleanup;
 
-       rpc_init_task(task, clnt, callback, flags);
-
-       /* Replace tk_release */
-       task->tk_release = rpc_default_free_task;
+       rpc_init_task(task, clnt, flags, tk_ops, calldata);
 
        dprintk("RPC: %4d allocated task\n", task->tk_pid);
        task->tk_flags |= RPC_TASK_DYNAMIC;
@@ -833,11 +870,15 @@ cleanup:
 
 void rpc_release_task(struct rpc_task *task)
 {
-       dprintk("RPC: %4d release task\n", task->tk_pid);
+       const struct rpc_call_ops *tk_ops = task->tk_ops;
+       void *calldata = task->tk_calldata;
 
 #ifdef RPC_DEBUG
        BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
 #endif
+       if (!atomic_dec_and_test(&task->tk_count))
+               return;
+       dprintk("RPC: %4d release task\n", task->tk_pid);
 
        /* Remove from global task list */
        spin_lock(&rpc_sched_lock);
@@ -845,7 +886,6 @@ void rpc_release_task(struct rpc_task *task)
        spin_unlock(&rpc_sched_lock);
 
        BUG_ON (RPC_IS_QUEUED(task));
-       task->tk_active = 0;
 
        /* Synchronously delete any running timer */
        rpc_delete_timer(task);
@@ -855,7 +895,6 @@ void rpc_release_task(struct rpc_task *task)
                xprt_release(task);
        if (task->tk_msg.rpc_cred)
                rpcauth_unbindcred(task);
-       rpc_free(task);
        if (task->tk_client) {
                rpc_release_client(task->tk_client);
                task->tk_client = NULL;
@@ -864,13 +903,40 @@ void rpc_release_task(struct rpc_task *task)
 #ifdef RPC_DEBUG
        task->tk_magic = 0;
 #endif
-       if (task->tk_release)
-               task->tk_release(task);
+       if (task->tk_flags & RPC_TASK_DYNAMIC)
+               rpc_free_task(task);
+       if (tk_ops->rpc_release)
+               tk_ops->rpc_release(calldata);
+}
+
+/**
+ * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
+ * @clnt: pointer to RPC client
+ * @flags: RPC flags
+ * @ops: RPC call ops
+ * @data: user call data
+ */
+struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags,
+                                       const struct rpc_call_ops *ops,
+                                       void *data)
+{
+       struct rpc_task *task;
+       task = rpc_new_task(clnt, flags, ops, data);
+       if (task == NULL) {
+               if (ops->rpc_release != NULL)
+                       ops->rpc_release(data);
+               return ERR_PTR(-ENOMEM);
+       }
+       atomic_inc(&task->tk_count);
+       rpc_execute(task);
+       return task;
 }
+EXPORT_SYMBOL(rpc_run_task);
 
 /**
  * rpc_find_parent - find the parent of a child task.
  * @child: child task
+ * @parent: parent task
  *
  * Checks that the parent task is still sleeping on the
  * queue 'childq'. If so returns a pointer to the parent.
@@ -878,12 +944,11 @@ void rpc_release_task(struct rpc_task *task)
  *
  * Caller must hold childq.lock
  */
-static inline struct rpc_task *rpc_find_parent(struct rpc_task *child)
+static inline struct rpc_task *rpc_find_parent(struct rpc_task *child, struct rpc_task *parent)
 {
-       struct rpc_task *task, *parent;
+       struct rpc_task *task;
        struct list_head *le;
 
-       parent = (struct rpc_task *) child->tk_calldata;
        task_for_each(task, le, &childq.tasks[0])
                if (task == parent)
                        return parent;
@@ -891,18 +956,22 @@ static inline struct rpc_task *rpc_find_parent(struct rpc_task *child)
        return NULL;
 }
 
-static void rpc_child_exit(struct rpc_task *child)
+static void rpc_child_exit(struct rpc_task *child, void *calldata)
 {
        struct rpc_task *parent;
 
        spin_lock_bh(&childq.lock);
-       if ((parent = rpc_find_parent(child)) != NULL) {
+       if ((parent = rpc_find_parent(child, calldata)) != NULL) {
                parent->tk_status = child->tk_status;
                __rpc_wake_up_task(parent);
        }
        spin_unlock_bh(&childq.lock);
 }
 
+static const struct rpc_call_ops rpc_child_ops = {
+       .rpc_call_done = rpc_child_exit,
+};
+
 /*
  * Note: rpc_new_task releases the client after a failure.
  */
@@ -911,11 +980,9 @@ rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent)
 {
        struct rpc_task *task;
 
-       task = rpc_new_task(clnt, NULL, RPC_TASK_ASYNC | RPC_TASK_CHILD);
+       task = rpc_new_task(clnt, RPC_TASK_ASYNC | RPC_TASK_CHILD, &rpc_child_ops, parent);
        if (!task)
                goto fail;
-       task->tk_exit = rpc_child_exit;
-       task->tk_calldata = parent;
        return task;
 
 fail:
@@ -989,7 +1056,7 @@ rpciod_up(void)
        struct workqueue_struct *wq;
        int error = 0;
 
-       down(&rpciod_sema);
+       mutex_lock(&rpciod_mutex);
        dprintk("rpciod_up: users %d\n", rpciod_users);
        rpciod_users++;
        if (rpciod_workqueue)
@@ -1012,14 +1079,14 @@ rpciod_up(void)
        rpciod_workqueue = wq;
        error = 0;
 out:
-       up(&rpciod_sema);
+       mutex_unlock(&rpciod_mutex);
        return error;
 }
 
 void
 rpciod_down(void)
 {
-       down(&rpciod_sema);
+       mutex_lock(&rpciod_mutex);
        dprintk("rpciod_down sema %d\n", rpciod_users);
        if (rpciod_users) {
                if (--rpciod_users)
@@ -1036,7 +1103,7 @@ rpciod_down(void)
        destroy_workqueue(rpciod_workqueue);
        rpciod_workqueue = NULL;
  out:
-       up(&rpciod_sema);
+       mutex_unlock(&rpciod_mutex);
 }
 
 #ifdef RPC_DEBUG
@@ -1051,7 +1118,7 @@ void rpc_show_tasks(void)
                return;
        }
        printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
-               "-rpcwait -action- --exit--\n");
+               "-rpcwait -action- ---ops--\n");
        alltask_for_each(t, le, &all_tasks) {
                const char *rpc_waitq = "none";
 
@@ -1066,7 +1133,7 @@ void rpc_show_tasks(void)
                        (t->tk_client ? t->tk_client->cl_prog : 0),
                        t->tk_rqstp, t->tk_timeout,
                        rpc_waitq,
-                       t->tk_action, t->tk_exit);
+                       t->tk_action, t->tk_ops);
        }
        spin_unlock(&rpc_sched_lock);
 }
@@ -1100,16 +1167,12 @@ rpc_init_mempool(void)
                                             NULL, NULL);
        if (!rpc_buffer_slabp)
                goto err_nomem;
-       rpc_task_mempool = mempool_create(RPC_TASK_POOLSIZE,
-                                           mempool_alloc_slab,
-                                           mempool_free_slab,
-                                           rpc_task_slabp);
+       rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
+                                                   rpc_task_slabp);
        if (!rpc_task_mempool)
                goto err_nomem;
-       rpc_buffer_mempool = mempool_create(RPC_BUFFER_POOLSIZE,
-                                           mempool_alloc_slab,
-                                           mempool_free_slab,
-                                           rpc_buffer_slabp);
+       rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
+                                                     rpc_buffer_slabp);
        if (!rpc_buffer_mempool)
                goto err_nomem;
        return 0;