X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=net%2Fsunrpc%2Fsched.c;h=fc083f0b354434940b98aa27e882f9d4bef8bfe6;hb=refs%2Fheads%2Fvserver;hp=d1c8b47f0d0946bc9a3c7e1bd3e38cb065e7a13f;hpb=76828883507a47dae78837ab5dec5a5b4513c667;p=linux-2.6.git diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index d1c8b47f0..fc083f0b3 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -18,9 +18,9 @@ #include #include #include +#include #include -#include #ifdef RPC_DEBUG #define RPCDBG_FACILITY RPCDBG_SCHED @@ -34,20 +34,15 @@ static int rpc_task_id; #define RPC_BUFFER_MAXSIZE (2048) #define RPC_BUFFER_POOLSIZE (8) #define RPC_TASK_POOLSIZE (8) -static kmem_cache_t *rpc_task_slabp __read_mostly; -static kmem_cache_t *rpc_buffer_slabp __read_mostly; +static struct kmem_cache *rpc_task_slabp __read_mostly; +static struct kmem_cache *rpc_buffer_slabp __read_mostly; static mempool_t *rpc_task_mempool __read_mostly; static mempool_t *rpc_buffer_mempool __read_mostly; static void __rpc_default_timer(struct rpc_task *task); static void rpciod_killall(void); -static void rpc_async_schedule(void *); - -/* - * RPC tasks that create another task (e.g. for contacting the portmapper) - * will wait on this queue for their child's completion - */ -static RPC_WAITQ(childq, "childq"); +static void rpc_async_schedule(struct work_struct *); +static void rpc_release_task(struct rpc_task *task); /* * RPC tasks sit here while waiting for conditions to improve. @@ -62,9 +57,9 @@ static LIST_HEAD(all_tasks); /* * rpciod-related stuff */ -static DECLARE_MUTEX(rpciod_sema); +static DEFINE_MUTEX(rpciod_mutex); static unsigned int rpciod_users; -static struct workqueue_struct *rpciod_workqueue; +struct workqueue_struct *rpciod_workqueue; /* * Spinlock for other critical sections of code. @@ -181,6 +176,7 @@ static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task * else list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]); task->u.tk_wait.rpc_waitq = queue; + queue->qlen++; rpc_set_queued(task); dprintk("RPC: %4d added to queue %p \"%s\"\n", @@ -215,6 +211,7 @@ static void __rpc_remove_wait_queue(struct rpc_task *task) __rpc_remove_wait_queue_priority(task); else list_del(&task->u.tk_wait.list); + queue->qlen--; dprintk("RPC: %4d removed from queue %p \"%s\"\n", task->tk_pid, queue, rpc_qname(queue)); } @@ -270,12 +267,28 @@ static int rpc_wait_bit_interruptible(void *word) return 0; } +static void rpc_set_active(struct rpc_task *task) +{ + if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0) + return; + spin_lock(&rpc_sched_lock); +#ifdef RPC_DEBUG + task->tk_magic = RPC_TASK_MAGIC_ID; + task->tk_pid = rpc_task_id++; +#endif + /* Add to global list of all tasks */ + list_add_tail(&task->tk_task, &all_tasks); + spin_unlock(&rpc_sched_lock); +} + /* * Mark an RPC call as having completed by clearing the 'active' bit */ -static inline void rpc_mark_complete_task(struct rpc_task *task) +static void rpc_mark_complete_task(struct rpc_task *task) { - rpc_clear_active(task); + smp_mb__before_clear_bit(); + clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate); + smp_mb__after_clear_bit(); wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE); } @@ -311,7 +324,7 @@ static void rpc_make_runnable(struct rpc_task *task) if (RPC_IS_ASYNC(task)) { int status; - INIT_WORK(&task->u.tk_work, rpc_async_schedule, (void *)task); + INIT_WORK(&task->u.tk_work, rpc_async_schedule); status = queue_work(task->tk_workqueue, &task->u.tk_work); if (status < 0) { printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); @@ -322,16 +335,6 @@ static void rpc_make_runnable(struct rpc_task *task) wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); } -/* - * Place a newly initialized task on the workqueue. - */ -static inline void -rpc_schedule_run(struct rpc_task *task) -{ - rpc_set_active(task); - rpc_make_runnable(task); -} - /* * Prepare for sleeping on a wait queue. * By always appending tasks to the list we ensure FIFO behavior. @@ -349,9 +352,6 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, return; } - /* Mark the task as being activated if so needed */ - rpc_set_active(task); - __rpc_add_wait_queue(q, task); BUG_ON(task->tk_callback != NULL); @@ -362,6 +362,9 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, rpc_action action, rpc_action timer) { + /* Mark the task as being activated if so needed */ + rpc_set_active(task); + /* * Protect the queue operations. */ @@ -425,16 +428,19 @@ __rpc_default_timer(struct rpc_task *task) */ void rpc_wake_up_task(struct rpc_task *task) { + rcu_read_lock_bh(); if (rpc_start_wakeup(task)) { if (RPC_IS_QUEUED(task)) { struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq; - spin_lock_bh(&queue->lock); + /* Note: we're already in a bh-safe context */ + spin_lock(&queue->lock); __rpc_do_wake_up_task(task); - spin_unlock_bh(&queue->lock); + spin_unlock(&queue->lock); } rpc_finish_wakeup(task); } + rcu_read_unlock_bh(); } /* @@ -497,14 +503,16 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue) struct rpc_task *task = NULL; dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue)); - spin_lock_bh(&queue->lock); + rcu_read_lock_bh(); + spin_lock(&queue->lock); if (RPC_IS_PRIORITY(queue)) task = __rpc_wake_up_next_priority(queue); else { task_for_first(task, &queue->tasks[0]) __rpc_wake_up_task(task); } - spin_unlock_bh(&queue->lock); + spin_unlock(&queue->lock); + rcu_read_unlock_bh(); return task; } @@ -520,7 +528,8 @@ void rpc_wake_up(struct rpc_wait_queue *queue) struct rpc_task *task, *next; struct list_head *head; - spin_lock_bh(&queue->lock); + rcu_read_lock_bh(); + spin_lock(&queue->lock); head = &queue->tasks[queue->maxpriority]; for (;;) { list_for_each_entry_safe(task, next, head, u.tk_wait.list) @@ -529,7 +538,8 @@ void rpc_wake_up(struct rpc_wait_queue *queue) break; head--; } - spin_unlock_bh(&queue->lock); + spin_unlock(&queue->lock); + rcu_read_unlock_bh(); } /** @@ -544,7 +554,8 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) struct rpc_task *task, *next; struct list_head *head; - spin_lock_bh(&queue->lock); + rcu_read_lock_bh(); + spin_lock(&queue->lock); head = &queue->tasks[queue->maxpriority]; for (;;) { list_for_each_entry_safe(task, next, head, u.tk_wait.list) { @@ -555,33 +566,32 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) break; head--; } - spin_unlock_bh(&queue->lock); + spin_unlock(&queue->lock); + rcu_read_unlock_bh(); +} + +static void __rpc_atrun(struct rpc_task *task) +{ + rpc_wake_up_task(task); } /* * Run a task at a later time */ -static void __rpc_atrun(struct rpc_task *); -void -rpc_delay(struct rpc_task *task, unsigned long delay) +void rpc_delay(struct rpc_task *task, unsigned long delay) { task->tk_timeout = delay; rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun); } -static void -__rpc_atrun(struct rpc_task *task) -{ - task->tk_status = 0; - rpc_wake_up_task(task); -} - /* * Helper to call task->tk_ops->rpc_call_prepare */ static void rpc_prepare_task(struct rpc_task *task) { + lock_kernel(); task->tk_ops->rpc_call_prepare(task, task->tk_calldata); + unlock_kernel(); } /* @@ -591,7 +601,9 @@ void rpc_exit_task(struct rpc_task *task) { task->tk_action = NULL; if (task->tk_ops->rpc_call_done != NULL) { + lock_kernel(); task->tk_ops->rpc_call_done(task, task->tk_calldata); + unlock_kernel(); if (task->tk_action != NULL) { WARN_ON(RPC_ASSASSINATED(task)); /* Always release the RPC slot and buffer memory */ @@ -601,6 +613,15 @@ void rpc_exit_task(struct rpc_task *task) } EXPORT_SYMBOL(rpc_exit_task); +void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata) +{ + if (ops->rpc_release != NULL) { + lock_kernel(); + ops->rpc_release(calldata); + unlock_kernel(); + } +} + /* * This is the RPC `scheduler' (or rather, the finite state machine). */ @@ -635,9 +656,7 @@ static int __rpc_execute(struct rpc_task *task) */ save_callback=task->tk_callback; task->tk_callback=NULL; - lock_kernel(); save_callback(task); - unlock_kernel(); } /* @@ -648,9 +667,7 @@ static int __rpc_execute(struct rpc_task *task) if (!RPC_IS_QUEUED(task)) { if (task->tk_action == NULL) break; - lock_kernel(); task->tk_action(task); - unlock_kernel(); } /* @@ -691,8 +708,6 @@ static int __rpc_execute(struct rpc_task *task) } dprintk("RPC: %4d, return %d, status %d\n", task->tk_pid, status, task->tk_status); - /* Wake up anyone who is waiting for task completion */ - rpc_mark_complete_task(task); /* Release all resources associated with the task */ rpc_release_task(task); return status; @@ -715,9 +730,9 @@ rpc_execute(struct rpc_task *task) return __rpc_execute(task); } -static void rpc_async_schedule(void *arg) +static void rpc_async_schedule(struct work_struct *work) { - __rpc_execute((struct rpc_task *)arg); + __rpc_execute(container_of(work, struct rpc_task, u.tk_work)); } /** @@ -806,17 +821,11 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, cons task->tk_flags |= RPC_TASK_NOINTR; } -#ifdef RPC_DEBUG - task->tk_magic = RPC_TASK_MAGIC_ID; - task->tk_pid = rpc_task_id++; -#endif - /* Add to global list of all tasks */ - spin_lock(&rpc_sched_lock); - list_add_tail(&task->tk_task, &all_tasks); - spin_unlock(&rpc_sched_lock); - BUG_ON(task->tk_ops == NULL); + /* starting timestamp */ + task->tk_start = jiffies; + dprintk("RPC: %4d new task procpid %d\n", task->tk_pid, current->pid); } @@ -827,8 +836,9 @@ rpc_alloc_task(void) return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS); } -static void rpc_free_task(struct rpc_task *task) +static void rpc_free_task(struct rcu_head *rcu) { + struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu); dprintk("RPC: %4d freeing task\n", task->tk_pid); mempool_free(task, rpc_task_mempool); } @@ -864,16 +874,34 @@ cleanup: goto out; } -void rpc_release_task(struct rpc_task *task) + +void rpc_put_task(struct rpc_task *task) { const struct rpc_call_ops *tk_ops = task->tk_ops; void *calldata = task->tk_calldata; + if (!atomic_dec_and_test(&task->tk_count)) + return; + /* Release resources */ + if (task->tk_rqstp) + xprt_release(task); + if (task->tk_msg.rpc_cred) + rpcauth_unbindcred(task); + if (task->tk_client) { + rpc_release_client(task->tk_client); + task->tk_client = NULL; + } + if (task->tk_flags & RPC_TASK_DYNAMIC) + call_rcu_bh(&task->u.tk_rcu, rpc_free_task); + rpc_release_calldata(tk_ops, calldata); +} +EXPORT_SYMBOL(rpc_put_task); + +static void rpc_release_task(struct rpc_task *task) +{ #ifdef RPC_DEBUG BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID); #endif - if (!atomic_dec_and_test(&task->tk_count)) - return; dprintk("RPC: %4d release task\n", task->tk_pid); /* Remove from global task list */ @@ -886,23 +914,13 @@ void rpc_release_task(struct rpc_task *task) /* Synchronously delete any running timer */ rpc_delete_timer(task); - /* Release resources */ - if (task->tk_rqstp) - xprt_release(task); - if (task->tk_msg.rpc_cred) - rpcauth_unbindcred(task); - if (task->tk_client) { - rpc_release_client(task->tk_client); - task->tk_client = NULL; - } - #ifdef RPC_DEBUG task->tk_magic = 0; #endif - if (task->tk_flags & RPC_TASK_DYNAMIC) - rpc_free_task(task); - if (tk_ops->rpc_release) - tk_ops->rpc_release(calldata); + /* Wake up anyone who is waiting for task completion */ + rpc_mark_complete_task(task); + + rpc_put_task(task); } /** @@ -918,80 +936,16 @@ struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags, { struct rpc_task *task; task = rpc_new_task(clnt, flags, ops, data); - if (task == NULL) + if (task == NULL) { + rpc_release_calldata(ops, data); return ERR_PTR(-ENOMEM); + } atomic_inc(&task->tk_count); rpc_execute(task); return task; } EXPORT_SYMBOL(rpc_run_task); -/** - * rpc_find_parent - find the parent of a child task. - * @child: child task - * @parent: parent task - * - * Checks that the parent task is still sleeping on the - * queue 'childq'. If so returns a pointer to the parent. - * Upon failure returns NULL. - * - * Caller must hold childq.lock - */ -static inline struct rpc_task *rpc_find_parent(struct rpc_task *child, struct rpc_task *parent) -{ - struct rpc_task *task; - struct list_head *le; - - task_for_each(task, le, &childq.tasks[0]) - if (task == parent) - return parent; - - return NULL; -} - -static void rpc_child_exit(struct rpc_task *child, void *calldata) -{ - struct rpc_task *parent; - - spin_lock_bh(&childq.lock); - if ((parent = rpc_find_parent(child, calldata)) != NULL) { - parent->tk_status = child->tk_status; - __rpc_wake_up_task(parent); - } - spin_unlock_bh(&childq.lock); -} - -static const struct rpc_call_ops rpc_child_ops = { - .rpc_call_done = rpc_child_exit, -}; - -/* - * Note: rpc_new_task releases the client after a failure. - */ -struct rpc_task * -rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent) -{ - struct rpc_task *task; - - task = rpc_new_task(clnt, RPC_TASK_ASYNC | RPC_TASK_CHILD, &rpc_child_ops, parent); - if (!task) - goto fail; - return task; - -fail: - parent->tk_status = -ENOMEM; - return NULL; -} - -void rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func) -{ - spin_lock_bh(&childq.lock); - /* N.B. Is it possible for the child to have already finished? */ - __rpc_sleep_on(&childq, task, func, NULL); - rpc_schedule_run(child); - spin_unlock_bh(&childq.lock); -} - /* * Kill all tasks for the given client. * XXX: kill their descendants as well? @@ -1049,7 +1003,7 @@ rpciod_up(void) struct workqueue_struct *wq; int error = 0; - down(&rpciod_sema); + mutex_lock(&rpciod_mutex); dprintk("rpciod_up: users %d\n", rpciod_users); rpciod_users++; if (rpciod_workqueue) @@ -1072,14 +1026,14 @@ rpciod_up(void) rpciod_workqueue = wq; error = 0; out: - up(&rpciod_sema); + mutex_unlock(&rpciod_mutex); return error; } void rpciod_down(void) { - down(&rpciod_sema); + mutex_lock(&rpciod_mutex); dprintk("rpciod_down sema %d\n", rpciod_users); if (rpciod_users) { if (--rpciod_users) @@ -1096,7 +1050,7 @@ rpciod_down(void) destroy_workqueue(rpciod_workqueue); rpciod_workqueue = NULL; out: - up(&rpciod_sema); + mutex_unlock(&rpciod_mutex); } #ifdef RPC_DEBUG @@ -1139,10 +1093,10 @@ rpc_destroy_mempool(void) mempool_destroy(rpc_buffer_mempool); if (rpc_task_mempool) mempool_destroy(rpc_task_mempool); - if (rpc_task_slabp && kmem_cache_destroy(rpc_task_slabp)) - printk(KERN_INFO "rpc_task: not all structures were freed\n"); - if (rpc_buffer_slabp && kmem_cache_destroy(rpc_buffer_slabp)) - printk(KERN_INFO "rpc_buffers: not all structures were freed\n"); + if (rpc_task_slabp) + kmem_cache_destroy(rpc_task_slabp); + if (rpc_buffer_slabp) + kmem_cache_destroy(rpc_buffer_slabp); } int @@ -1160,16 +1114,12 @@ rpc_init_mempool(void) NULL, NULL); if (!rpc_buffer_slabp) goto err_nomem; - rpc_task_mempool = mempool_create(RPC_TASK_POOLSIZE, - mempool_alloc_slab, - mempool_free_slab, - rpc_task_slabp); + rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE, + rpc_task_slabp); if (!rpc_task_mempool) goto err_nomem; - rpc_buffer_mempool = mempool_create(RPC_BUFFER_POOLSIZE, - mempool_alloc_slab, - mempool_free_slab, - rpc_buffer_slabp); + rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE, + rpc_buffer_slabp); if (!rpc_buffer_mempool) goto err_nomem; return 0;