*
* Derived from the taskqueue/keventd code by:
*
- * David Woodhouse <dwmw2@redhat.com>
+ * David Woodhouse <dwmw2@infradead.org>
* Andrew Morton <andrewm@uow.edu.au>
* Kai Petzke <wpp@marie.physik.tu-berlin.de>
* Theodore Ts'o <tytso@mit.edu>
+ *
+ * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
*/
#include <linux/module.h>
#include <linux/kthread.h>
/*
- * The per-CPU workqueue (if single thread, we always use cpu 0's).
+ * The per-CPU workqueue (if single thread, we always use the first
+ * possible cpu).
*
* The sequence counters are for flush_scheduled_work(). It wants to wait
* until until all currently-scheduled works are completed, but it doesn't
* per-CPU workqueues:
*/
struct workqueue_struct {
- struct cpu_workqueue_struct cpu_wq[NR_CPUS];
+ struct cpu_workqueue_struct *cpu_wq;
const char *name;
struct list_head list; /* Empty if single thread */
};
/* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
threads to each one as cpus come/go. */
-static spinlock_t workqueue_lock = SPIN_LOCK_UNLOCKED;
+static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues);
+static int singlethread_cpu;
+
/* If it's single threaded, it isn't in the list of workqueues. */
static inline int is_single_threaded(struct workqueue_struct *wq)
{
if (!test_and_set_bit(0, &work->pending)) {
if (unlikely(is_single_threaded(wq)))
- cpu = 0;
+ cpu = singlethread_cpu;
BUG_ON(!list_empty(&work->entry));
- __queue_work(wq->cpu_wq + cpu, work);
+ __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
ret = 1;
}
put_cpu();
int cpu = smp_processor_id();
if (unlikely(is_single_threaded(wq)))
- cpu = 0;
+ cpu = singlethread_cpu;
- __queue_work(wq->cpu_wq + cpu, work);
+ __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
}
int fastcall queue_delayed_work(struct workqueue_struct *wq,
return ret;
}
-static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
+static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
unsigned long flags;
current->flags |= PF_NOFREEZE;
- set_user_nice(current, -10);
+ set_user_nice(current, -5);
/* Block and flush all signals */
sigfillset(&blocked);
siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
+ set_current_state(TASK_INTERRUPTIBLE);
while (!kthread_should_stop()) {
- set_task_state(current, TASK_INTERRUPTIBLE);
-
add_wait_queue(&cwq->more_work, &wait);
if (list_empty(&cwq->worklist))
schedule();
else
- set_task_state(current, TASK_RUNNING);
+ __set_current_state(TASK_RUNNING);
remove_wait_queue(&cwq->more_work, &wait);
if (!list_empty(&cwq->worklist))
run_workqueue(cwq);
+ set_current_state(TASK_INTERRUPTIBLE);
}
+ __set_current_state(TASK_RUNNING);
return 0;
}
+static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
+{
+ if (cwq->thread == current) {
+ /*
+ * Probably keventd trying to flush its own queue. So simply run
+ * it by hand rather than deadlocking.
+ */
+ run_workqueue(cwq);
+ } else {
+ DEFINE_WAIT(wait);
+ long sequence_needed;
+
+ spin_lock_irq(&cwq->lock);
+ sequence_needed = cwq->insert_sequence;
+
+ while (sequence_needed - cwq->remove_sequence > 0) {
+ prepare_to_wait(&cwq->work_done, &wait,
+ TASK_UNINTERRUPTIBLE);
+ spin_unlock_irq(&cwq->lock);
+ schedule();
+ spin_lock_irq(&cwq->lock);
+ }
+ finish_wait(&cwq->work_done, &wait);
+ spin_unlock_irq(&cwq->lock);
+ }
+}
+
/*
* flush_workqueue - ensure that any scheduled work has run to completion.
*
*/
void fastcall flush_workqueue(struct workqueue_struct *wq)
{
- struct cpu_workqueue_struct *cwq;
- int cpu;
-
might_sleep();
- lock_cpu_hotplug();
- for_each_online_cpu(cpu) {
- DEFINE_WAIT(wait);
- long sequence_needed;
-
- if (is_single_threaded(wq))
- cwq = wq->cpu_wq + 0; /* Always use cpu 0's area. */
- else
- cwq = wq->cpu_wq + cpu;
-
- if (cwq->thread == current) {
- /*
- * Probably keventd trying to flush its own queue.
- * So simply run it by hand rather than deadlocking.
- */
- run_workqueue(cwq);
- continue;
- }
- spin_lock_irq(&cwq->lock);
- sequence_needed = cwq->insert_sequence;
+ if (is_single_threaded(wq)) {
+ /* Always use first cpu's area. */
+ flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
+ } else {
+ int cpu;
- while (sequence_needed - cwq->remove_sequence > 0) {
- prepare_to_wait(&cwq->work_done, &wait,
- TASK_UNINTERRUPTIBLE);
- spin_unlock_irq(&cwq->lock);
- schedule();
- spin_lock_irq(&cwq->lock);
- }
- finish_wait(&cwq->work_done, &wait);
- spin_unlock_irq(&cwq->lock);
+ lock_cpu_hotplug();
+ for_each_online_cpu(cpu)
+ flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
+ unlock_cpu_hotplug();
}
- unlock_cpu_hotplug();
}
static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
int cpu)
{
- struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
+ struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
struct task_struct *p;
spin_lock_init(&cwq->lock);
struct workqueue_struct *wq;
struct task_struct *p;
- BUG_ON(strlen(name) > 10);
-
- wq = kmalloc(sizeof(*wq), GFP_KERNEL);
+ wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
return NULL;
- memset(wq, 0, sizeof(*wq));
+
+ wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
+ if (!wq->cpu_wq) {
+ kfree(wq);
+ return NULL;
+ }
wq->name = name;
/* We don't need the distraction of CPUs appearing and vanishing. */
lock_cpu_hotplug();
if (singlethread) {
INIT_LIST_HEAD(&wq->list);
- p = create_workqueue_thread(wq, 0);
+ p = create_workqueue_thread(wq, singlethread_cpu);
if (!p)
destroy = 1;
else
} else {
spin_lock(&workqueue_lock);
list_add(&wq->list, &workqueues);
- spin_unlock_irq(&workqueue_lock);
+ spin_unlock(&workqueue_lock);
for_each_online_cpu(cpu) {
p = create_workqueue_thread(wq, cpu);
if (p) {
destroy = 1;
}
}
+ unlock_cpu_hotplug();
/*
* Was there any error during startup? If yes then clean up:
destroy_workqueue(wq);
wq = NULL;
}
- unlock_cpu_hotplug();
return wq;
}
unsigned long flags;
struct task_struct *p;
- cwq = wq->cpu_wq + cpu;
+ cwq = per_cpu_ptr(wq->cpu_wq, cpu);
spin_lock_irqsave(&cwq->lock, flags);
p = cwq->thread;
cwq->thread = NULL;
/* We don't need the distraction of CPUs appearing and vanishing. */
lock_cpu_hotplug();
if (is_single_threaded(wq))
- cleanup_workqueue_thread(wq, 0);
+ cleanup_workqueue_thread(wq, singlethread_cpu);
else {
for_each_online_cpu(cpu)
cleanup_workqueue_thread(wq, cpu);
spin_lock(&workqueue_lock);
list_del(&wq->list);
- spin_unlock_irq(&workqueue_lock);
+ spin_unlock(&workqueue_lock);
}
unlock_cpu_hotplug();
+ free_percpu(wq->cpu_wq);
kfree(wq);
}
return queue_delayed_work(keventd_wq, work, delay);
}
+int schedule_delayed_work_on(int cpu,
+ struct work_struct *work, unsigned long delay)
+{
+ int ret = 0;
+ struct timer_list *timer = &work->timer;
+
+ if (!test_and_set_bit(0, &work->pending)) {
+ BUG_ON(timer_pending(timer));
+ BUG_ON(!list_empty(&work->entry));
+ /* This stores keventd_wq for the moment, for the timer_fn */
+ work->wq_data = keventd_wq;
+ timer->expires = jiffies + delay;
+ timer->data = (unsigned long)work;
+ timer->function = delayed_work_timer_fn;
+ add_timer_on(timer, cpu);
+ ret = 1;
+ }
+ return ret;
+}
+
+int schedule_on_each_cpu(void (*func) (void *info), void *info)
+{
+ int cpu;
+ struct work_struct *work;
+
+ work = kmalloc(NR_CPUS * sizeof(struct work_struct), GFP_KERNEL);
+
+ if (!work)
+ return -ENOMEM;
+ for_each_online_cpu(cpu) {
+ INIT_WORK(work + cpu, func, info);
+ __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu),
+ work + cpu);
+ }
+ flush_workqueue(keventd_wq);
+ kfree(work);
+ return 0;
+}
+
void flush_scheduled_work(void)
{
flush_workqueue(keventd_wq);
}
+/**
+ * cancel_rearming_delayed_workqueue - reliably kill off a delayed
+ * work whose handler rearms the delayed work.
+ * @wq: the controlling workqueue structure
+ * @work: the delayed work struct
+ */
+void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
+ struct work_struct *work)
+{
+ while (!cancel_delayed_work(work))
+ flush_workqueue(wq);
+}
+EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
+
+/**
+ * cancel_rearming_delayed_work - reliably kill off a delayed keventd
+ * work whose handler rearms the delayed work.
+ * @work: the delayed work struct
+ */
+void cancel_rearming_delayed_work(struct work_struct *work)
+{
+ cancel_rearming_delayed_workqueue(keventd_wq, work);
+}
+EXPORT_SYMBOL(cancel_rearming_delayed_work);
+
int keventd_up(void)
{
return keventd_wq != NULL;
BUG_ON(!keventd_wq);
- cwq = keventd_wq->cpu_wq + cpu;
+ cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
if (current == cwq->thread)
ret = 1;
/* Take the work from this (downed) CPU. */
static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
{
- struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
+ struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
LIST_HEAD(list);
struct work_struct *work;
printk("Taking work for %s\n", wq->name);
work = list_entry(list.next,struct work_struct,entry);
list_del(&work->entry);
- __queue_work(wq->cpu_wq + smp_processor_id(), work);
+ __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
}
spin_unlock_irq(&cwq->lock);
}
case CPU_UP_PREPARE:
/* Create a new workqueue thread for it. */
list_for_each_entry(wq, &workqueues, list) {
- if (create_workqueue_thread(wq, hotcpu) < 0) {
+ if (!create_workqueue_thread(wq, hotcpu)) {
printk("workqueue for %i failed\n", hotcpu);
return NOTIFY_BAD;
}
case CPU_ONLINE:
/* Kick off worker threads. */
- list_for_each_entry(wq, &workqueues, list)
- wake_up_process(wq->cpu_wq[hotcpu].thread);
+ list_for_each_entry(wq, &workqueues, list) {
+ struct cpu_workqueue_struct *cwq;
+
+ cwq = per_cpu_ptr(wq->cpu_wq, hotcpu);
+ kthread_bind(cwq->thread, hotcpu);
+ wake_up_process(cwq->thread);
+ }
break;
case CPU_UP_CANCELED:
list_for_each_entry(wq, &workqueues, list) {
/* Unbind so it can run. */
- kthread_bind(wq->cpu_wq[hotcpu].thread,
- smp_processor_id());
+ kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
+ any_online_cpu(cpu_online_map));
cleanup_workqueue_thread(wq, hotcpu);
}
break;
void init_workqueues(void)
{
+ singlethread_cpu = first_cpu(cpu_possible_map);
hotcpu_notifier(workqueue_cpu_callback, 0);
keventd_wq = create_workqueue("events");
BUG_ON(!keventd_wq);
EXPORT_SYMBOL(schedule_work);
EXPORT_SYMBOL(schedule_delayed_work);
+EXPORT_SYMBOL(schedule_delayed_work_on);
EXPORT_SYMBOL(flush_scheduled_work);
-