X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=kernel%2Fworkqueue.c;h=b052e2c4c71053720e87606e20924cfec5f3f27c;hb=64ba3f394c830ec48a1c31b53dcae312c56f1604;hp=162aca1f48222d846d1e47cc1171faa732eb61a1;hpb=5273a3df6485dc2ad6aa7ddd441b9a21970f003b;p=linux-2.6.git diff --git a/kernel/workqueue.c b/kernel/workqueue.c index 162aca1f4..b052e2c4c 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -8,10 +8,12 @@ * * Derived from the taskqueue/keventd code by: * - * David Woodhouse + * David Woodhouse * Andrew Morton * Kai Petzke * Theodore Ts'o + * + * Made to use alloc_percpu by Christoph Lameter . */ #include @@ -27,7 +29,8 @@ #include /* - * The per-CPU workqueue (if single thread, we always use cpu 0's). + * The per-CPU workqueue (if single thread, we always use the first + * possible cpu). * * The sequence counters are for flush_scheduled_work(). It wants to wait * until until all currently-scheduled works are completed, but it doesn't @@ -57,16 +60,18 @@ struct cpu_workqueue_struct { * per-CPU workqueues: */ struct workqueue_struct { - struct cpu_workqueue_struct cpu_wq[NR_CPUS]; + struct cpu_workqueue_struct *cpu_wq; const char *name; struct list_head list; /* Empty if single thread */ }; /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove threads to each one as cpus come/go. */ -static spinlock_t workqueue_lock = SPIN_LOCK_UNLOCKED; +static DEFINE_SPINLOCK(workqueue_lock); static LIST_HEAD(workqueues); +static int singlethread_cpu; + /* If it's single threaded, it isn't in the list of workqueues. */ static inline int is_single_threaded(struct workqueue_struct *wq) { @@ -100,9 +105,9 @@ int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) if (!test_and_set_bit(0, &work->pending)) { if (unlikely(is_single_threaded(wq))) - cpu = 0; + cpu = singlethread_cpu; BUG_ON(!list_empty(&work->entry)); - __queue_work(wq->cpu_wq + cpu, work); + __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); ret = 1; } put_cpu(); @@ -116,9 +121,9 @@ static void delayed_work_timer_fn(unsigned long __data) int cpu = smp_processor_id(); if (unlikely(is_single_threaded(wq))) - cpu = 0; + cpu = singlethread_cpu; - __queue_work(wq->cpu_wq + cpu, work); + __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); } int fastcall queue_delayed_work(struct workqueue_struct *wq, @@ -142,7 +147,7 @@ int fastcall queue_delayed_work(struct workqueue_struct *wq, return ret; } -static inline void run_workqueue(struct cpu_workqueue_struct *cwq) +static void run_workqueue(struct cpu_workqueue_struct *cwq) { unsigned long flags; @@ -188,7 +193,7 @@ static int worker_thread(void *__cwq) current->flags |= PF_NOFREEZE; - set_user_nice(current, -10); + set_user_nice(current, -5); /* Block and flush all signals */ sigfillset(&blocked); @@ -201,22 +206,50 @@ static int worker_thread(void *__cwq) siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0); + set_current_state(TASK_INTERRUPTIBLE); while (!kthread_should_stop()) { - set_task_state(current, TASK_INTERRUPTIBLE); - add_wait_queue(&cwq->more_work, &wait); if (list_empty(&cwq->worklist)) schedule(); else - set_task_state(current, TASK_RUNNING); + __set_current_state(TASK_RUNNING); remove_wait_queue(&cwq->more_work, &wait); if (!list_empty(&cwq->worklist)) run_workqueue(cwq); + set_current_state(TASK_INTERRUPTIBLE); } + __set_current_state(TASK_RUNNING); return 0; } +static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) +{ + if (cwq->thread == current) { + /* + * Probably keventd trying to flush its own queue. So simply run + * it by hand rather than deadlocking. + */ + run_workqueue(cwq); + } else { + DEFINE_WAIT(wait); + long sequence_needed; + + spin_lock_irq(&cwq->lock); + sequence_needed = cwq->insert_sequence; + + while (sequence_needed - cwq->remove_sequence > 0) { + prepare_to_wait(&cwq->work_done, &wait, + TASK_UNINTERRUPTIBLE); + spin_unlock_irq(&cwq->lock); + schedule(); + spin_lock_irq(&cwq->lock); + } + finish_wait(&cwq->work_done, &wait); + spin_unlock_irq(&cwq->lock); + } +} + /* * flush_workqueue - ensure that any scheduled work has run to completion. * @@ -233,49 +266,25 @@ static int worker_thread(void *__cwq) */ void fastcall flush_workqueue(struct workqueue_struct *wq) { - struct cpu_workqueue_struct *cwq; - int cpu; - might_sleep(); - lock_cpu_hotplug(); - for_each_online_cpu(cpu) { - DEFINE_WAIT(wait); - long sequence_needed; - - if (is_single_threaded(wq)) - cwq = wq->cpu_wq + 0; /* Always use cpu 0's area. */ - else - cwq = wq->cpu_wq + cpu; - - if (cwq->thread == current) { - /* - * Probably keventd trying to flush its own queue. - * So simply run it by hand rather than deadlocking. - */ - run_workqueue(cwq); - continue; - } - spin_lock_irq(&cwq->lock); - sequence_needed = cwq->insert_sequence; + if (is_single_threaded(wq)) { + /* Always use first cpu's area. */ + flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); + } else { + int cpu; - while (sequence_needed - cwq->remove_sequence > 0) { - prepare_to_wait(&cwq->work_done, &wait, - TASK_UNINTERRUPTIBLE); - spin_unlock_irq(&cwq->lock); - schedule(); - spin_lock_irq(&cwq->lock); - } - finish_wait(&cwq->work_done, &wait); - spin_unlock_irq(&cwq->lock); + lock_cpu_hotplug(); + for_each_online_cpu(cpu) + flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); + unlock_cpu_hotplug(); } - unlock_cpu_hotplug(); } static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, int cpu) { - struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu; + struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); struct task_struct *p; spin_lock_init(&cwq->lock); @@ -304,19 +313,22 @@ struct workqueue_struct *__create_workqueue(const char *name, struct workqueue_struct *wq; struct task_struct *p; - BUG_ON(strlen(name) > 10); - - wq = kmalloc(sizeof(*wq), GFP_KERNEL); + wq = kzalloc(sizeof(*wq), GFP_KERNEL); if (!wq) return NULL; - memset(wq, 0, sizeof(*wq)); + + wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); + if (!wq->cpu_wq) { + kfree(wq); + return NULL; + } wq->name = name; /* We don't need the distraction of CPUs appearing and vanishing. */ lock_cpu_hotplug(); if (singlethread) { INIT_LIST_HEAD(&wq->list); - p = create_workqueue_thread(wq, 0); + p = create_workqueue_thread(wq, singlethread_cpu); if (!p) destroy = 1; else @@ -324,7 +336,7 @@ struct workqueue_struct *__create_workqueue(const char *name, } else { spin_lock(&workqueue_lock); list_add(&wq->list, &workqueues); - spin_unlock_irq(&workqueue_lock); + spin_unlock(&workqueue_lock); for_each_online_cpu(cpu) { p = create_workqueue_thread(wq, cpu); if (p) { @@ -334,6 +346,7 @@ struct workqueue_struct *__create_workqueue(const char *name, destroy = 1; } } + unlock_cpu_hotplug(); /* * Was there any error during startup? If yes then clean up: @@ -342,7 +355,6 @@ struct workqueue_struct *__create_workqueue(const char *name, destroy_workqueue(wq); wq = NULL; } - unlock_cpu_hotplug(); return wq; } @@ -352,7 +364,7 @@ static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) unsigned long flags; struct task_struct *p; - cwq = wq->cpu_wq + cpu; + cwq = per_cpu_ptr(wq->cpu_wq, cpu); spin_lock_irqsave(&cwq->lock, flags); p = cwq->thread; cwq->thread = NULL; @@ -370,15 +382,16 @@ void destroy_workqueue(struct workqueue_struct *wq) /* We don't need the distraction of CPUs appearing and vanishing. */ lock_cpu_hotplug(); if (is_single_threaded(wq)) - cleanup_workqueue_thread(wq, 0); + cleanup_workqueue_thread(wq, singlethread_cpu); else { for_each_online_cpu(cpu) cleanup_workqueue_thread(wq, cpu); spin_lock(&workqueue_lock); list_del(&wq->list); - spin_unlock_irq(&workqueue_lock); + spin_unlock(&workqueue_lock); } unlock_cpu_hotplug(); + free_percpu(wq->cpu_wq); kfree(wq); } @@ -394,11 +407,75 @@ int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay return queue_delayed_work(keventd_wq, work, delay); } +int schedule_delayed_work_on(int cpu, + struct work_struct *work, unsigned long delay) +{ + int ret = 0; + struct timer_list *timer = &work->timer; + + if (!test_and_set_bit(0, &work->pending)) { + BUG_ON(timer_pending(timer)); + BUG_ON(!list_empty(&work->entry)); + /* This stores keventd_wq for the moment, for the timer_fn */ + work->wq_data = keventd_wq; + timer->expires = jiffies + delay; + timer->data = (unsigned long)work; + timer->function = delayed_work_timer_fn; + add_timer_on(timer, cpu); + ret = 1; + } + return ret; +} + +int schedule_on_each_cpu(void (*func) (void *info), void *info) +{ + int cpu; + struct work_struct *work; + + work = kmalloc(NR_CPUS * sizeof(struct work_struct), GFP_KERNEL); + + if (!work) + return -ENOMEM; + for_each_online_cpu(cpu) { + INIT_WORK(work + cpu, func, info); + __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), + work + cpu); + } + flush_workqueue(keventd_wq); + kfree(work); + return 0; +} + void flush_scheduled_work(void) { flush_workqueue(keventd_wq); } +/** + * cancel_rearming_delayed_workqueue - reliably kill off a delayed + * work whose handler rearms the delayed work. + * @wq: the controlling workqueue structure + * @work: the delayed work struct + */ +void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq, + struct work_struct *work) +{ + while (!cancel_delayed_work(work)) + flush_workqueue(wq); +} +EXPORT_SYMBOL(cancel_rearming_delayed_workqueue); + +/** + * cancel_rearming_delayed_work - reliably kill off a delayed keventd + * work whose handler rearms the delayed work. + * @work: the delayed work struct + */ +void cancel_rearming_delayed_work(struct work_struct *work) +{ + cancel_rearming_delayed_workqueue(keventd_wq, work); +} +EXPORT_SYMBOL(cancel_rearming_delayed_work); + int keventd_up(void) { return keventd_wq != NULL; @@ -412,7 +489,7 @@ int current_is_keventd(void) BUG_ON(!keventd_wq); - cwq = keventd_wq->cpu_wq + cpu; + cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); if (current == cwq->thread) ret = 1; @@ -424,7 +501,7 @@ int current_is_keventd(void) /* Take the work from this (downed) CPU. */ static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) { - struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu; + struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); LIST_HEAD(list); struct work_struct *work; @@ -435,7 +512,7 @@ static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) printk("Taking work for %s\n", wq->name); work = list_entry(list.next,struct work_struct,entry); list_del(&work->entry); - __queue_work(wq->cpu_wq + smp_processor_id(), work); + __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work); } spin_unlock_irq(&cwq->lock); } @@ -452,7 +529,7 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, case CPU_UP_PREPARE: /* Create a new workqueue thread for it. */ list_for_each_entry(wq, &workqueues, list) { - if (create_workqueue_thread(wq, hotcpu) < 0) { + if (!create_workqueue_thread(wq, hotcpu)) { printk("workqueue for %i failed\n", hotcpu); return NOTIFY_BAD; } @@ -461,15 +538,20 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, case CPU_ONLINE: /* Kick off worker threads. */ - list_for_each_entry(wq, &workqueues, list) - wake_up_process(wq->cpu_wq[hotcpu].thread); + list_for_each_entry(wq, &workqueues, list) { + struct cpu_workqueue_struct *cwq; + + cwq = per_cpu_ptr(wq->cpu_wq, hotcpu); + kthread_bind(cwq->thread, hotcpu); + wake_up_process(cwq->thread); + } break; case CPU_UP_CANCELED: list_for_each_entry(wq, &workqueues, list) { /* Unbind so it can run. */ - kthread_bind(wq->cpu_wq[hotcpu].thread, - smp_processor_id()); + kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread, + any_online_cpu(cpu_online_map)); cleanup_workqueue_thread(wq, hotcpu); } break; @@ -488,6 +570,7 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, void init_workqueues(void) { + singlethread_cpu = first_cpu(cpu_possible_map); hotcpu_notifier(workqueue_cpu_callback, 0); keventd_wq = create_workqueue("events"); BUG_ON(!keventd_wq); @@ -501,5 +584,5 @@ EXPORT_SYMBOL_GPL(destroy_workqueue); EXPORT_SYMBOL(schedule_work); EXPORT_SYMBOL(schedule_delayed_work); +EXPORT_SYMBOL(schedule_delayed_work_on); EXPORT_SYMBOL(flush_scheduled_work); -