Merge to Fedora kernel-2.6.18-1.2255_FC5 patched with stable patch-2.6.18.5-vs2.0...
[linux-2.6.git] / kernel / workqueue.c
index ee77ccd..835fe28 100644 (file)
@@ -8,10 +8,12 @@
  *
  * Derived from the taskqueue/keventd code by:
  *
- *   David Woodhouse <dwmw2@redhat.com>
+ *   David Woodhouse <dwmw2@infradead.org>
  *   Andrew Morton <andrewm@uow.edu.au>
  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
  *   Theodore Ts'o <tytso@mit.edu>
+ *
+ * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
  */
 
 #include <linux/module.h>
 #include <linux/cpu.h>
 #include <linux/notifier.h>
 #include <linux/kthread.h>
+#include <linux/hardirq.h>
 
 /*
- * The per-CPU workqueue (if single thread, we always use cpu 0's).
+ * The per-CPU workqueue (if single thread, we always use the first
+ * possible cpu).
  *
  * The sequence counters are for flush_scheduled_work().  It wants to wait
  * until until all currently-scheduled works are completed, but it doesn't
@@ -47,7 +51,7 @@ struct cpu_workqueue_struct {
        wait_queue_head_t work_done;
 
        struct workqueue_struct *wq;
-       task_t *thread;
+       struct task_struct *thread;
 
        int run_depth;          /* Detect run_workqueue() recursion depth */
 } ____cacheline_aligned;
@@ -57,16 +61,18 @@ struct cpu_workqueue_struct {
  * per-CPU workqueues:
  */
 struct workqueue_struct {
-       struct cpu_workqueue_struct cpu_wq[NR_CPUS];
+       struct cpu_workqueue_struct *cpu_wq;
        const char *name;
        struct list_head list;  /* Empty if single thread */
 };
 
 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
    threads to each one as cpus come/go. */
-static spinlock_t workqueue_lock = SPIN_LOCK_UNLOCKED;
+static DEFINE_MUTEX(workqueue_mutex);
 static LIST_HEAD(workqueues);
 
+static int singlethread_cpu;
+
 /* If it's single threaded, it isn't in the list of workqueues. */
 static inline int is_single_threaded(struct workqueue_struct *wq)
 {
@@ -87,9 +93,12 @@ static void __queue_work(struct cpu_workqueue_struct *cwq,
        spin_unlock_irqrestore(&cwq->lock, flags);
 }
 
-/*
- * Queue work on a workqueue. Return non-zero if it was successfully
- * added.
+/**
+ * queue_work - queue work on a workqueue
+ * @wq: workqueue to use
+ * @work: work to queue
+ *
+ * Returns non-zero if it was successfully added.
  *
  * We queue the work to the CPU it was submitted, but there is no
  * guarantee that it will be processed by that CPU.
@@ -100,14 +109,15 @@ int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
 
        if (!test_and_set_bit(0, &work->pending)) {
                if (unlikely(is_single_threaded(wq)))
-                       cpu = 0;
+                       cpu = singlethread_cpu;
                BUG_ON(!list_empty(&work->entry));
-               __queue_work(wq->cpu_wq + cpu, work);
+               __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
                ret = 1;
        }
        put_cpu();
        return ret;
 }
+EXPORT_SYMBOL_GPL(queue_work);
 
 static void delayed_work_timer_fn(unsigned long __data)
 {
@@ -116,11 +126,19 @@ static void delayed_work_timer_fn(unsigned long __data)
        int cpu = smp_processor_id();
 
        if (unlikely(is_single_threaded(wq)))
-               cpu = 0;
+               cpu = singlethread_cpu;
 
-       __queue_work(wq->cpu_wq + cpu, work);
+       __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
 }
 
+/**
+ * queue_delayed_work - queue work on a workqueue after delay
+ * @wq: workqueue to use
+ * @work: work to queue
+ * @delay: number of jiffies to wait before queueing
+ *
+ * Returns non-zero if it was successfully added.
+ */
 int fastcall queue_delayed_work(struct workqueue_struct *wq,
                        struct work_struct *work, unsigned long delay)
 {
@@ -141,8 +159,40 @@ int fastcall queue_delayed_work(struct workqueue_struct *wq,
        }
        return ret;
 }
+EXPORT_SYMBOL_GPL(queue_delayed_work);
+
+/**
+ * queue_delayed_work_on - queue work on specific CPU after delay
+ * @cpu: CPU number to execute work on
+ * @wq: workqueue to use
+ * @work: work to queue
+ * @delay: number of jiffies to wait before queueing
+ *
+ * Returns non-zero if it was successfully added.
+ */
+int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
+                       struct work_struct *work, unsigned long delay)
+{
+       int ret = 0;
+       struct timer_list *timer = &work->timer;
+
+       if (!test_and_set_bit(0, &work->pending)) {
+               BUG_ON(timer_pending(timer));
+               BUG_ON(!list_empty(&work->entry));
+
+               /* This stores wq for the moment, for the timer_fn */
+               work->wq_data = wq;
+               timer->expires = jiffies + delay;
+               timer->data = (unsigned long)work;
+               timer->function = delayed_work_timer_fn;
+               add_timer_on(timer, cpu);
+               ret = 1;
+       }
+       return ret;
+}
+EXPORT_SYMBOL_GPL(queue_delayed_work_on);
 
-static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
+static void run_workqueue(struct cpu_workqueue_struct *cwq)
 {
        unsigned long flags;
 
@@ -188,7 +238,7 @@ static int worker_thread(void *__cwq)
 
        current->flags |= PF_NOFREEZE;
 
-       set_user_nice(current, -10);
+       set_user_nice(current, -5);
 
        /* Block and flush all signals */
        sigfillset(&blocked);
@@ -245,8 +295,9 @@ static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
        }
 }
 
-/*
+/**
  * flush_workqueue - ensure that any scheduled work has run to completion.
+ * @wq: workqueue to flush
  *
  * Forces execution of the workqueue and blocks until its completion.
  * This is typically used in driver shutdown handlers.
@@ -264,22 +315,23 @@ void fastcall flush_workqueue(struct workqueue_struct *wq)
        might_sleep();
 
        if (is_single_threaded(wq)) {
-               /* Always use cpu 0's area. */
-               flush_cpu_workqueue(wq->cpu_wq + 0);
+               /* Always use first cpu's area. */
+               flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
        } else {
                int cpu;
 
-               lock_cpu_hotplug();
+               mutex_lock(&workqueue_mutex);
                for_each_online_cpu(cpu)
-                       flush_cpu_workqueue(wq->cpu_wq + cpu);
-               unlock_cpu_hotplug();
+                       flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
+               mutex_unlock(&workqueue_mutex);
        }
 }
+EXPORT_SYMBOL_GPL(flush_workqueue);
 
 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
                                                   int cpu)
 {
-       struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
+       struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
        struct task_struct *p;
 
        spin_lock_init(&cwq->lock);
@@ -308,27 +360,27 @@ struct workqueue_struct *__create_workqueue(const char *name,
        struct workqueue_struct *wq;
        struct task_struct *p;
 
-       BUG_ON(strlen(name) > 10);
-
-       wq = kmalloc(sizeof(*wq), GFP_KERNEL);
+       wq = kzalloc(sizeof(*wq), GFP_KERNEL);
        if (!wq)
                return NULL;
-       memset(wq, 0, sizeof(*wq));
+
+       wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
+       if (!wq->cpu_wq) {
+               kfree(wq);
+               return NULL;
+       }
 
        wq->name = name;
-       /* We don't need the distraction of CPUs appearing and vanishing. */
-       lock_cpu_hotplug();
+       mutex_lock(&workqueue_mutex);
        if (singlethread) {
                INIT_LIST_HEAD(&wq->list);
-               p = create_workqueue_thread(wq, 0);
+               p = create_workqueue_thread(wq, singlethread_cpu);
                if (!p)
                        destroy = 1;
                else
                        wake_up_process(p);
        } else {
-               spin_lock(&workqueue_lock);
                list_add(&wq->list, &workqueues);
-               spin_unlock(&workqueue_lock);
                for_each_online_cpu(cpu) {
                        p = create_workqueue_thread(wq, cpu);
                        if (p) {
@@ -338,7 +390,7 @@ struct workqueue_struct *__create_workqueue(const char *name,
                                destroy = 1;
                }
        }
-       unlock_cpu_hotplug();
+       mutex_unlock(&workqueue_mutex);
 
        /*
         * Was there any error during startup? If yes then clean up:
@@ -349,6 +401,7 @@ struct workqueue_struct *__create_workqueue(const char *name,
        }
        return wq;
 }
+EXPORT_SYMBOL_GPL(__create_workqueue);
 
 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
 {
@@ -356,7 +409,7 @@ static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
        unsigned long flags;
        struct task_struct *p;
 
-       cwq = wq->cpu_wq + cpu;
+       cwq = per_cpu_ptr(wq->cpu_wq, cpu);
        spin_lock_irqsave(&cwq->lock, flags);
        p = cwq->thread;
        cwq->thread = NULL;
@@ -365,6 +418,12 @@ static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
                kthread_stop(p);
 }
 
+/**
+ * destroy_workqueue - safely terminate a workqueue
+ * @wq: target workqueue
+ *
+ * Safely destroy a workqueue. All work currently pending will be done first.
+ */
 void destroy_workqueue(struct workqueue_struct *wq)
 {
        int cpu;
@@ -372,56 +431,155 @@ void destroy_workqueue(struct workqueue_struct *wq)
        flush_workqueue(wq);
 
        /* We don't need the distraction of CPUs appearing and vanishing. */
-       lock_cpu_hotplug();
+       mutex_lock(&workqueue_mutex);
        if (is_single_threaded(wq))
-               cleanup_workqueue_thread(wq, 0);
+               cleanup_workqueue_thread(wq, singlethread_cpu);
        else {
                for_each_online_cpu(cpu)
                        cleanup_workqueue_thread(wq, cpu);
-               spin_lock(&workqueue_lock);
                list_del(&wq->list);
-               spin_unlock(&workqueue_lock);
        }
-       unlock_cpu_hotplug();
+       mutex_unlock(&workqueue_mutex);
+       free_percpu(wq->cpu_wq);
        kfree(wq);
 }
+EXPORT_SYMBOL_GPL(destroy_workqueue);
 
 static struct workqueue_struct *keventd_wq;
 
+/**
+ * schedule_work - put work task in global workqueue
+ * @work: job to be done
+ *
+ * This puts a job in the kernel-global workqueue.
+ */
 int fastcall schedule_work(struct work_struct *work)
 {
        return queue_work(keventd_wq, work);
 }
+EXPORT_SYMBOL(schedule_work);
 
+/**
+ * schedule_delayed_work - put work task in global workqueue after delay
+ * @work: job to be done
+ * @delay: number of jiffies to wait
+ *
+ * After waiting for a given time this puts a job in the kernel-global
+ * workqueue.
+ */
 int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
 {
        return queue_delayed_work(keventd_wq, work, delay);
 }
+EXPORT_SYMBOL(schedule_delayed_work);
 
+/**
+ * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
+ * @cpu: cpu to use
+ * @work: job to be done
+ * @delay: number of jiffies to wait
+ *
+ * After waiting for a given time this puts a job in the kernel-global
+ * workqueue on the specified CPU.
+ */
 int schedule_delayed_work_on(int cpu,
                        struct work_struct *work, unsigned long delay)
 {
-       int ret = 0;
-       struct timer_list *timer = &work->timer;
+       return queue_delayed_work_on(cpu, keventd_wq, work, delay);
+}
+EXPORT_SYMBOL(schedule_delayed_work_on);
 
-       if (!test_and_set_bit(0, &work->pending)) {
-               BUG_ON(timer_pending(timer));
-               BUG_ON(!list_empty(&work->entry));
-               /* This stores keventd_wq for the moment, for the timer_fn */
-               work->wq_data = keventd_wq;
-               timer->expires = jiffies + delay;
-               timer->data = (unsigned long)work;
-               timer->function = delayed_work_timer_fn;
-               add_timer_on(timer, cpu);
-               ret = 1;
+/**
+ * schedule_on_each_cpu - call a function on each online CPU from keventd
+ * @func: the function to call
+ * @info: a pointer to pass to func()
+ *
+ * Returns zero on success.
+ * Returns -ve errno on failure.
+ *
+ * Appears to be racy against CPU hotplug.
+ *
+ * schedule_on_each_cpu() is very slow.
+ */
+int schedule_on_each_cpu(void (*func)(void *info), void *info)
+{
+       int cpu;
+       struct work_struct *works;
+
+       works = alloc_percpu(struct work_struct);
+       if (!works)
+               return -ENOMEM;
+
+       mutex_lock(&workqueue_mutex);
+       for_each_online_cpu(cpu) {
+               INIT_WORK(per_cpu_ptr(works, cpu), func, info);
+               __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu),
+                               per_cpu_ptr(works, cpu));
        }
-       return ret;
+       mutex_unlock(&workqueue_mutex);
+       flush_workqueue(keventd_wq);
+       free_percpu(works);
+       return 0;
 }
 
 void flush_scheduled_work(void)
 {
        flush_workqueue(keventd_wq);
 }
+EXPORT_SYMBOL(flush_scheduled_work);
+
+/**
+ * cancel_rearming_delayed_workqueue - reliably kill off a delayed
+ *                     work whose handler rearms the delayed work.
+ * @wq:   the controlling workqueue structure
+ * @work: the delayed work struct
+ */
+void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
+                                      struct work_struct *work)
+{
+       while (!cancel_delayed_work(work))
+               flush_workqueue(wq);
+}
+EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
+
+/**
+ * cancel_rearming_delayed_work - reliably kill off a delayed keventd
+ *                     work whose handler rearms the delayed work.
+ * @work: the delayed work struct
+ */
+void cancel_rearming_delayed_work(struct work_struct *work)
+{
+       cancel_rearming_delayed_workqueue(keventd_wq, work);
+}
+EXPORT_SYMBOL(cancel_rearming_delayed_work);
+
+/**
+ * execute_in_process_context - reliably execute the routine with user context
+ * @fn:                the function to execute
+ * @data:      data to pass to the function
+ * @ew:                guaranteed storage for the execute work structure (must
+ *             be available when the work executes)
+ *
+ * Executes the function immediately if process context is available,
+ * otherwise schedules the function for delayed execution.
+ *
+ * Returns:    0 - function was executed
+ *             1 - function was scheduled for execution
+ */
+int execute_in_process_context(void (*fn)(void *data), void *data,
+                              struct execute_work *ew)
+{
+       if (!in_interrupt()) {
+               fn(data);
+               return 0;
+       }
+
+       INIT_WORK(&ew->work, fn, data);
+       schedule_work(&ew->work);
+
+       return 1;
+}
+EXPORT_SYMBOL_GPL(execute_in_process_context);
 
 int keventd_up(void)
 {
@@ -436,7 +594,7 @@ int current_is_keventd(void)
 
        BUG_ON(!keventd_wq);
 
-       cwq = keventd_wq->cpu_wq + cpu;
+       cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
        if (current == cwq->thread)
                ret = 1;
 
@@ -448,18 +606,18 @@ int current_is_keventd(void)
 /* Take the work from this (downed) CPU. */
 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
 {
-       struct cpu_workqueue_struct *cwq = wq->cpu_wq + cpu;
-       LIST_HEAD(list);
+       struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
+       struct list_head list;
        struct work_struct *work;
 
        spin_lock_irq(&cwq->lock);
-       list_splice_init(&cwq->worklist, &list);
+       list_replace_init(&cwq->worklist, &list);
 
        while (!list_empty(&list)) {
                printk("Taking work for %s\n", wq->name);
                work = list_entry(list.next,struct work_struct,entry);
                list_del(&work->entry);
-               __queue_work(wq->cpu_wq + smp_processor_id(), work);
+               __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
        }
        spin_unlock_irq(&cwq->lock);
 }
@@ -474,9 +632,10 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 
        switch (action) {
        case CPU_UP_PREPARE:
+               mutex_lock(&workqueue_mutex);
                /* Create a new workqueue thread for it. */
                list_for_each_entry(wq, &workqueues, list) {
-                       if (create_workqueue_thread(wq, hotcpu) < 0) {
+                       if (!create_workqueue_thread(wq, hotcpu)) {
                                printk("workqueue for %i failed\n", hotcpu);
                                return NOTIFY_BAD;
                        }
@@ -485,17 +644,34 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 
        case CPU_ONLINE:
                /* Kick off worker threads. */
-               list_for_each_entry(wq, &workqueues, list)
-                       wake_up_process(wq->cpu_wq[hotcpu].thread);
+               list_for_each_entry(wq, &workqueues, list) {
+                       struct cpu_workqueue_struct *cwq;
+
+                       cwq = per_cpu_ptr(wq->cpu_wq, hotcpu);
+                       kthread_bind(cwq->thread, hotcpu);
+                       wake_up_process(cwq->thread);
+               }
+               mutex_unlock(&workqueue_mutex);
                break;
 
        case CPU_UP_CANCELED:
                list_for_each_entry(wq, &workqueues, list) {
+                       if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread)
+                               continue;
                        /* Unbind so it can run. */
-                       kthread_bind(wq->cpu_wq[hotcpu].thread,
-                                    smp_processor_id());
+                       kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
+                                    any_online_cpu(cpu_online_map));
                        cleanup_workqueue_thread(wq, hotcpu);
                }
+               mutex_unlock(&workqueue_mutex);
+               break;
+
+       case CPU_DOWN_PREPARE:
+               mutex_lock(&workqueue_mutex);
+               break;
+
+       case CPU_DOWN_FAILED:
+               mutex_unlock(&workqueue_mutex);
                break;
 
        case CPU_DEAD:
@@ -503,6 +679,7 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
                        cleanup_workqueue_thread(wq, hotcpu);
                list_for_each_entry(wq, &workqueues, list)
                        take_over_work(wq, hotcpu);
+               mutex_unlock(&workqueue_mutex);
                break;
        }
 
@@ -512,18 +689,9 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
 
 void init_workqueues(void)
 {
+       singlethread_cpu = first_cpu(cpu_possible_map);
        hotcpu_notifier(workqueue_cpu_callback, 0);
        keventd_wq = create_workqueue("events");
        BUG_ON(!keventd_wq);
 }
 
-EXPORT_SYMBOL_GPL(__create_workqueue);
-EXPORT_SYMBOL_GPL(queue_work);
-EXPORT_SYMBOL_GPL(queue_delayed_work);
-EXPORT_SYMBOL_GPL(flush_workqueue);
-EXPORT_SYMBOL_GPL(destroy_workqueue);
-
-EXPORT_SYMBOL(schedule_work);
-EXPORT_SYMBOL(schedule_delayed_work);
-EXPORT_SYMBOL(schedule_delayed_work_on);
-EXPORT_SYMBOL(flush_scheduled_work);