X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=net%2Fsunrpc%2Fsched.c;h=5c3eee76850458d1ca17511b405a473267d9cdb6;hb=43bc926fffd92024b46cafaf7350d669ba9ca884;hp=c06614d0e31de5d974f101a4ac90cf37a7764d60;hpb=cee37fe97739d85991964371c1f3a745c00dd236;p=linux-2.6.git diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index c06614d0e..5c3eee768 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -34,15 +35,13 @@ static int rpc_task_id; #define RPC_BUFFER_MAXSIZE (2048) #define RPC_BUFFER_POOLSIZE (8) #define RPC_TASK_POOLSIZE (8) -static kmem_cache_t *rpc_task_slabp; -static kmem_cache_t *rpc_buffer_slabp; -static mempool_t *rpc_task_mempool; -static mempool_t *rpc_buffer_mempool; +static kmem_cache_t *rpc_task_slabp __read_mostly; +static kmem_cache_t *rpc_buffer_slabp __read_mostly; +static mempool_t *rpc_task_mempool __read_mostly; +static mempool_t *rpc_buffer_mempool __read_mostly; static void __rpc_default_timer(struct rpc_task *task); static void rpciod_killall(void); -static void rpc_free(struct rpc_task *task); - static void rpc_async_schedule(void *); /* @@ -64,9 +63,9 @@ static LIST_HEAD(all_tasks); /* * rpciod-related stuff */ -static DECLARE_MUTEX(rpciod_sema); +static DEFINE_MUTEX(rpciod_mutex); static unsigned int rpciod_users; -static struct workqueue_struct *rpciod_workqueue; +struct workqueue_struct *rpciod_workqueue; /* * Spinlock for other critical sections of code. @@ -183,6 +182,7 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task * else list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); task->u.tk_wait.rpc_waitq = queue; + queue->qlen++; rpc_set_queued(task); dprintk("RPC: %4d added to queue %p \"%s\"\n", @@ -217,6 +217,7 @@ static void __rpc_remove_wait_queue(struct rpc_task *task) __rpc_remove_wait_queue_priority(task); else list_del(&task->u.tk_wait.list); + queue->qlen--; dprintk("RPC: %4d removed from queue %p \"%s\"\n", task->tk_pid, queue, rpc_qname(queue)); } @@ -264,6 +265,35 @@ void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname) } EXPORT_SYMBOL(rpc_init_wait_queue); +static int rpc_wait_bit_interruptible(void *word) +{ + if (signal_pending(current)) + return -ERESTARTSYS; + schedule(); + return 0; +} + +/* + * Mark an RPC call as having completed by clearing the 'active' bit + */ +static inline void rpc_mark_complete_task(struct rpc_task *task) +{ + rpc_clear_active(task); + wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); +} + +/* + * Allow callers to wait for completion of an RPC call + */ +int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *)) +{ + if (action == NULL) + action = rpc_wait_bit_interruptible; + return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE, + action, TASK_INTERRUPTIBLE); +} +EXPORT_SYMBOL(__rpc_wait_for_completion_task); + /* * Make an RPC task runnable. * @@ -290,7 +320,7 @@ static void rpc_make_runnable(struct rpc_task *task) return; } } else - wake_up(&task->u.tk_wait.waitq); + wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); } /* @@ -299,10 +329,7 @@ static void rpc_make_runnable(struct rpc_task *task) static inline void rpc_schedule_run(struct rpc_task *task) { - /* Don't run a child twice! */ - if (RPC_IS_ACTIVATED(task)) - return; - task->tk_active = 1; + rpc_set_active(task); rpc_make_runnable(task); } @@ -324,8 +351,7 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, } /* Mark the task as being activated if so needed */ - if (!RPC_IS_ACTIVATED(task)) - task->tk_active = 1; + rpc_set_active(task); __rpc_add_wait_queue(q, task); @@ -492,16 +518,14 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue) */ void rpc_wake_up(struct rpc_wait_queue *queue) { - struct rpc_task *task; - + struct rpc_task *task, *next; struct list_head *head; + spin_lock_bh(&queue->lock); head = &queue->tasks[queue->maxpriority]; for (;;) { - while (!list_empty(head)) { - task = list_entry(head->next, struct rpc_task, u.tk_wait.list); + list_for_each_entry_safe(task, next, head, u.tk_wait.list) __rpc_wake_up_task(task); - } if (head == &queue->tasks[0]) break; head--; @@ -518,14 +542,13 @@ void rpc_wake_up(struct rpc_wait_queue *queue) */ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) { + struct rpc_task *task, *next; struct list_head *head; - struct rpc_task *task; spin_lock_bh(&queue->lock); head = &queue->tasks[queue->maxpriority]; for (;;) { - while (!list_empty(head)) { - task = list_entry(head->next, struct rpc_task, u.tk_wait.list); + list_for_each_entry_safe(task, next, head, u.tk_wait.list) { task->tk_status = status; __rpc_wake_up_task(task); } @@ -554,6 +577,31 @@ __rpc_atrun(struct rpc_task *task) rpc_wake_up_task(task); } +/* + * Helper to call task->tk_ops->rpc_call_prepare + */ +static void rpc_prepare_task(struct rpc_task *task) +{ + task->tk_ops->rpc_call_prepare(task, task->tk_calldata); +} + +/* + * Helper that calls task->tk_ops->rpc_call_done if it exists + */ +void rpc_exit_task(struct rpc_task *task) +{ + task->tk_action = NULL; + if (task->tk_ops->rpc_call_done != NULL) { + task->tk_ops->rpc_call_done(task, task->tk_calldata); + if (task->tk_action != NULL) { + WARN_ON(RPC_ASSASSINATED(task)); + /* Always release the RPC slot and buffer memory */ + xprt_release(task); + } + } +} +EXPORT_SYMBOL(rpc_exit_task); + /* * This is the RPC `scheduler' (or rather, the finite state machine). */ @@ -566,8 +614,7 @@ static int __rpc_execute(struct rpc_task *task) BUG_ON(RPC_IS_QUEUED(task)); - restarted: - while (1) { + for (;;) { /* * Garbage collection of pending timers... */ @@ -600,7 +647,7 @@ static int __rpc_execute(struct rpc_task *task) * by someone else. */ if (!RPC_IS_QUEUED(task)) { - if (!task->tk_action) + if (task->tk_action == NULL) break; lock_kernel(); task->tk_action(task); @@ -624,47 +671,29 @@ static int __rpc_execute(struct rpc_task *task) /* sync task: sleep here */ dprintk("RPC: %4d sync task going to sleep\n", task->tk_pid); - if (RPC_TASK_UNINTERRUPTIBLE(task)) { - __wait_event(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task)); - } else { - __wait_event_interruptible(task->u.tk_wait.waitq, !RPC_IS_QUEUED(task), status); + /* Note: Caller should be using rpc_clnt_sigmask() */ + status = out_of_line_wait_on_bit(&task->tk_runstate, + RPC_TASK_QUEUED, rpc_wait_bit_interruptible, + TASK_INTERRUPTIBLE); + if (status == -ERESTARTSYS) { /* * When a sync task receives a signal, it exits with * -ERESTARTSYS. In order to catch any callbacks that * clean up after sleeping on some queue, we don't * break the loop here, but go around once more. */ - if (status == -ERESTARTSYS) { - dprintk("RPC: %4d got signal\n", task->tk_pid); - task->tk_flags |= RPC_TASK_KILLED; - rpc_exit(task, -ERESTARTSYS); - rpc_wake_up_task(task); - } + dprintk("RPC: %4d got signal\n", task->tk_pid); + task->tk_flags |= RPC_TASK_KILLED; + rpc_exit(task, -ERESTARTSYS); + rpc_wake_up_task(task); } rpc_set_running(task); dprintk("RPC: %4d sync task resuming\n", task->tk_pid); } - if (task->tk_exit) { - lock_kernel(); - task->tk_exit(task); - unlock_kernel(); - /* If tk_action is non-null, the user wants us to restart */ - if (task->tk_action) { - if (!RPC_ASSASSINATED(task)) { - /* Release RPC slot and buffer memory */ - if (task->tk_rqstp) - xprt_release(task); - rpc_free(task); - goto restarted; - } - printk(KERN_ERR "RPC: dead task tries to walk away.\n"); - } - } - - dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status); - status = task->tk_status; - + dprintk("RPC: %4d, return %d, status %d\n", task->tk_pid, status, task->tk_status); + /* Wake up anyone who is waiting for task completion */ + rpc_mark_complete_task(task); /* Release all resources associated with the task */ rpc_release_task(task); return status; @@ -682,9 +711,7 @@ static int __rpc_execute(struct rpc_task *task) int rpc_execute(struct rpc_task *task) { - BUG_ON(task->tk_active); - - task->tk_active = 1; + rpc_set_active(task); rpc_set_running(task); return __rpc_execute(task); } @@ -694,18 +721,20 @@ static void rpc_async_schedule(void *arg) __rpc_execute((struct rpc_task *)arg); } -/* - * Allocate memory for RPC purposes. +/** + * rpc_malloc - allocate an RPC buffer + * @task: RPC task that will use this buffer + * @size: requested byte size * * We try to ensure that some NFS reads and writes can always proceed * by using a mempool when allocating 'small' buffers. * In order to avoid memory starvation triggering more writebacks of * NFS requests, we use GFP_NOFS rather than GFP_KERNEL. */ -void * -rpc_malloc(struct rpc_task *task, size_t size) +void * rpc_malloc(struct rpc_task *task, size_t size) { - int gfp; + struct rpc_rqst *req = task->tk_rqstp; + gfp_t gfp; if (task->tk_flags & RPC_TASK_SWAPPER) gfp = GFP_ATOMIC; @@ -713,42 +742,52 @@ rpc_malloc(struct rpc_task *task, size_t size) gfp = GFP_NOFS; if (size > RPC_BUFFER_MAXSIZE) { - task->tk_buffer = kmalloc(size, gfp); - if (task->tk_buffer) - task->tk_bufsize = size; + req->rq_buffer = kmalloc(size, gfp); + if (req->rq_buffer) + req->rq_bufsize = size; } else { - task->tk_buffer = mempool_alloc(rpc_buffer_mempool, gfp); - if (task->tk_buffer) - task->tk_bufsize = RPC_BUFFER_MAXSIZE; + req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp); + if (req->rq_buffer) + req->rq_bufsize = RPC_BUFFER_MAXSIZE; } - return task->tk_buffer; + return req->rq_buffer; } -static void -rpc_free(struct rpc_task *task) +/** + * rpc_free - free buffer allocated via rpc_malloc + * @task: RPC task with a buffer to be freed + * + */ +void rpc_free(struct rpc_task *task) { - if (task->tk_buffer) { - if (task->tk_bufsize == RPC_BUFFER_MAXSIZE) - mempool_free(task->tk_buffer, rpc_buffer_mempool); + struct rpc_rqst *req = task->tk_rqstp; + + if (req->rq_buffer) { + if (req->rq_bufsize == RPC_BUFFER_MAXSIZE) + mempool_free(req->rq_buffer, rpc_buffer_mempool); else - kfree(task->tk_buffer); - task->tk_buffer = NULL; - task->tk_bufsize = 0; + kfree(req->rq_buffer); + req->rq_buffer = NULL; + req->rq_bufsize = 0; } } /* * Creation and deletion of RPC task structures */ -void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action callback, int flags) +void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) { memset(task, 0, sizeof(*task)); init_timer(&task->tk_timer); task->tk_timer.data = (unsigned long) task; task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer; + atomic_set(&task->tk_count, 1); task->tk_client = clnt; task->tk_flags = flags; - task->tk_exit = callback; + task->tk_ops = tk_ops; + if (tk_ops->rpc_call_prepare != NULL) + task->tk_action = rpc_prepare_task; + task->tk_calldata = calldata; /* Initialize retry counters */ task->tk_garb_retry = 2; @@ -759,8 +798,6 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call /* Initialize workqueue for async tasks */ task->tk_workqueue = rpciod_workqueue; - if (!RPC_IS_ASYNC(task)) - init_waitqueue_head(&task->u.tk_wait.waitq); if (clnt) { atomic_inc(&clnt->cl_users); @@ -779,6 +816,11 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, rpc_action call list_add_tail(&task->tk_task, &all_tasks); spin_unlock(&rpc_sched_lock); + BUG_ON(task->tk_ops == NULL); + + /* starting timestamp */ + task->tk_start = jiffies; + dprintk("RPC: %4d new task procpid %d\n", task->tk_pid, current->pid); } @@ -789,8 +831,7 @@ rpc_alloc_task(void) return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); } -static void -rpc_default_free_task(struct rpc_task *task) +static void rpc_free_task(struct rpc_task *task) { dprintk("RPC: %4d freeing task\n", task->tk_pid); mempool_free(task, rpc_task_mempool); @@ -801,8 +842,7 @@ rpc_default_free_task(struct rpc_task *task) * clean up after an allocation failure, as the client may * have specified "oneshot". */ -struct rpc_task * -rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags) +struct rpc_task *rpc_new_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *calldata) { struct rpc_task *task; @@ -810,10 +850,7 @@ rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags) if (!task) goto cleanup; - rpc_init_task(task, clnt, callback, flags); - - /* Replace tk_release */ - task->tk_release = rpc_default_free_task; + rpc_init_task(task, clnt, flags, tk_ops, calldata); dprintk("RPC: %4d allocated task\n", task->tk_pid); task->tk_flags |= RPC_TASK_DYNAMIC; @@ -833,11 +870,15 @@ cleanup: void rpc_release_task(struct rpc_task *task) { - dprintk("RPC: %4d release task\n", task->tk_pid); + const struct rpc_call_ops *tk_ops = task->tk_ops; + void *calldata = task->tk_calldata; #ifdef RPC_DEBUG BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); #endif + if (!atomic_dec_and_test(&task->tk_count)) + return; + dprintk("RPC: %4d release task\n", task->tk_pid); /* Remove from global task list */ spin_lock(&rpc_sched_lock); @@ -845,7 +886,6 @@ void rpc_release_task(struct rpc_task *task) spin_unlock(&rpc_sched_lock); BUG_ON (RPC_IS_QUEUED(task)); - task->tk_active = 0; /* Synchronously delete any running timer */ rpc_delete_timer(task); @@ -855,7 +895,6 @@ void rpc_release_task(struct rpc_task *task) xprt_release(task); if (task->tk_msg.rpc_cred) rpcauth_unbindcred(task); - rpc_free(task); if (task->tk_client) { rpc_release_client(task->tk_client); task->tk_client = NULL; @@ -864,13 +903,40 @@ void rpc_release_task(struct rpc_task *task) #ifdef RPC_DEBUG task->tk_magic = 0; #endif - if (task->tk_release) - task->tk_release(task); + if (task->tk_flags & RPC_TASK_DYNAMIC) + rpc_free_task(task); + if (tk_ops->rpc_release) + tk_ops->rpc_release(calldata); +} + +/** + * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it + * @clnt: pointer to RPC client + * @flags: RPC flags + * @ops: RPC call ops + * @data: user call data + */ +struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags, + const struct rpc_call_ops *ops, + void *data) +{ + struct rpc_task *task; + task = rpc_new_task(clnt, flags, ops, data); + if (task == NULL) { + if (ops->rpc_release != NULL) + ops->rpc_release(data); + return ERR_PTR(-ENOMEM); + } + atomic_inc(&task->tk_count); + rpc_execute(task); + return task; } +EXPORT_SYMBOL(rpc_run_task); /** * rpc_find_parent - find the parent of a child task. * @child: child task + * @parent: parent task * * Checks that the parent task is still sleeping on the * queue 'childq'. If so returns a pointer to the parent. @@ -878,12 +944,11 @@ void rpc_release_task(struct rpc_task *task) * * Caller must hold childq.lock */ -static inline struct rpc_task *rpc_find_parent(struct rpc_task *child) +static inline struct rpc_task *rpc_find_parent(struct rpc_task *child, struct rpc_task *parent) { - struct rpc_task *task, *parent; + struct rpc_task *task; struct list_head *le; - parent = (struct rpc_task *) child->tk_calldata; task_for_each(task, le, &childq.tasks[0]) if (task == parent) return parent; @@ -891,18 +956,22 @@ static inline struct rpc_task *rpc_find_parent(struct rpc_task *child) return NULL; } -static void rpc_child_exit(struct rpc_task *child) +static void rpc_child_exit(struct rpc_task *child, void *calldata) { struct rpc_task *parent; spin_lock_bh(&childq.lock); - if ((parent = rpc_find_parent(child)) != NULL) { + if ((parent = rpc_find_parent(child, calldata)) != NULL) { parent->tk_status = child->tk_status; __rpc_wake_up_task(parent); } spin_unlock_bh(&childq.lock); } +static const struct rpc_call_ops rpc_child_ops = { + .rpc_call_done = rpc_child_exit, +}; + /* * Note: rpc_new_task releases the client after a failure. */ @@ -911,11 +980,9 @@ rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent) { struct rpc_task *task; - task = rpc_new_task(clnt, NULL, RPC_TASK_ASYNC | RPC_TASK_CHILD); + task = rpc_new_task(clnt, RPC_TASK_ASYNC | RPC_TASK_CHILD, &rpc_child_ops, parent); if (!task) goto fail; - task->tk_exit = rpc_child_exit; - task->tk_calldata = parent; return task; fail: @@ -989,7 +1056,7 @@ rpciod_up(void) struct workqueue_struct *wq; int error = 0; - down(&rpciod_sema); + mutex_lock(&rpciod_mutex); dprintk("rpciod_up: users %d\n", rpciod_users); rpciod_users++; if (rpciod_workqueue) @@ -1012,14 +1079,14 @@ rpciod_up(void) rpciod_workqueue = wq; error = 0; out: - up(&rpciod_sema); + mutex_unlock(&rpciod_mutex); return error; } void rpciod_down(void) { - down(&rpciod_sema); + mutex_lock(&rpciod_mutex); dprintk("rpciod_down sema %d\n", rpciod_users); if (rpciod_users) { if (--rpciod_users) @@ -1036,7 +1103,7 @@ rpciod_down(void) destroy_workqueue(rpciod_workqueue); rpciod_workqueue = NULL; out: - up(&rpciod_sema); + mutex_unlock(&rpciod_mutex); } #ifdef RPC_DEBUG @@ -1051,7 +1118,7 @@ void rpc_show_tasks(void) return; } printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout " - "-rpcwait -action- --exit--\n"); + "-rpcwait -action- ---ops--\n"); alltask_for_each(t, le, &all_tasks) { const char *rpc_waitq = "none"; @@ -1066,7 +1133,7 @@ void rpc_show_tasks(void) (t->tk_client ? t->tk_client->cl_prog : 0), t->tk_rqstp, t->tk_timeout, rpc_waitq, - t->tk_action, t->tk_exit); + t->tk_action, t->tk_ops); } spin_unlock(&rpc_sched_lock); } @@ -1100,16 +1167,12 @@ rpc_init_mempool(void) NULL, NULL); if (!rpc_buffer_slabp) goto err_nomem; - rpc_task_mempool = mempool_create(RPC_TASK_POOLSIZE, - mempool_alloc_slab, - mempool_free_slab, - rpc_task_slabp); + rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE, + rpc_task_slabp); if (!rpc_task_mempool) goto err_nomem; - rpc_buffer_mempool = mempool_create(RPC_BUFFER_POOLSIZE, - mempool_alloc_slab, - mempool_free_slab, - rpc_buffer_slabp); + rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE, + rpc_buffer_slabp); if (!rpc_buffer_mempool) goto err_nomem; return 0;