2 * TUX - Integrated Application Protocols Layer and Object Cache
4 * Copyright (C) 2000, 2001, Ingo Molnar <mingo@redhat.com>
6 * cachemiss.c: handle the 'slow IO path' by queueing not-yet-cached
7 * requests to the IO-thread pool. Dynamic load balancing is done
8 * between IO threads, based on the number of requests they have pending.
12 #include <linux/delay.h>
14 /****************************************************************
15 * This program is free software; you can redistribute it and/or modify
16 * it under the terms of the GNU General Public License as published by
17 * the Free Software Foundation; either version 2, or (at your option)
20 * This program is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * GNU General Public License for more details.
25 * You should have received a copy of the GNU General Public License
26 * along with this program; if not, write to the Free Software
27 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 ****************************************************************/
31 void queue_cachemiss (tux_req_t *req)
33 iothread_t *iot = req->ti->iot;
35 Dprintk("queueing_cachemiss(req:%p) (req->cwd_dentry: %p) at %p:%p.\n",
36 req, req->cwd_dentry, __builtin_return_address(0), __builtin_return_address(1));
37 if (req->idle_input || req->wait_output_space)
39 req->had_cachemiss = 1;
40 if (!list_empty(&req->work))
42 spin_lock(&iot->async_lock);
43 if (connection_too_fast(req))
44 list_add_tail(&req->work, &iot->async_queue);
46 list_add(&req->work, &iot->async_queue);
47 iot->nr_async_pending++;
48 INC_STAT(nr_cachemiss_pending);
49 spin_unlock(&iot->async_lock);
51 wake_up(&iot->async_sleep);
54 static tux_req_t * get_cachemiss (iothread_t *iot)
56 struct list_head *tmp;
57 tux_req_t *req = NULL;
59 spin_lock(&iot->async_lock);
60 if (!list_empty(&iot->async_queue)) {
62 tmp = iot->async_queue.next;
63 req = list_entry(tmp, tux_req_t, work);
65 Dprintk("get_cachemiss(%p): got req %p.\n", iot, req);
68 iot->nr_async_pending--;
69 DEC_STAT(nr_cachemiss_pending);
71 if (req->ti->iot != iot)
74 spin_unlock(&iot->async_lock);
78 struct file * tux_open_file (char *filename, int mode)
85 /* Rule no. 3 -- Does the file exist ? */
87 filp = filp_open(filename, mode, 0600);
89 if (IS_ERR(filp) || !filp || !filp->f_dentry)
95 Dprintk("filp_open() error: %d.\n", (int)filp);
100 static int cachemiss_thread (void *data)
103 struct k_sigaction *ka;
104 DECLARE_WAITQUEUE(wait, current);
105 iothread_t *iot = data;
106 int nr = iot->ti->cpu, wake_up;
108 Dprintk("iot %p/%p got started.\n", iot, current);
111 spin_lock(&iot->async_lock);
113 sprintf(current->comm, "async IO %d/%d", nr, iot->threads);
116 spin_lock_irq(¤t->sighand->siglock);
117 ka = current->sighand->action + SIGCHLD-1;
118 ka->sa.sa_handler = SIG_IGN;
119 siginitsetinv(¤t->blocked, sigmask(SIGCHLD));
121 spin_unlock_irq(¤t->sighand->siglock);
123 spin_unlock(&iot->async_lock);
128 if (cpu_isset(nr, cpu_online_map)) {
131 set_cpus_allowed(current, mask);
137 add_wait_queue_exclusive(&iot->async_sleep, &wait);
140 while (!list_empty(&iot->async_queue) &&
141 (req = get_cachemiss(iot))) {
143 if (!req->atom_idx) {
144 add_tux_atom(req, flush_request);
145 add_req_to_workqueue(req);
148 tux_schedule_atom(req, 1);
149 if (signal_pending(current))
152 if (signal_pending(current))
154 if (!list_empty(&iot->async_queue))
157 Dprintk("iot %p/%p got shutdown!\n", iot, current);
160 __set_current_state(TASK_INTERRUPTIBLE);
161 if (list_empty(&iot->async_queue)) {
162 Dprintk("iot %p/%p going to sleep.\n", iot, current);
164 Dprintk("iot %p/%p got woken up.\n", iot, current);
166 __set_current_state(TASK_RUNNING);
169 remove_wait_queue(&iot->async_sleep, &wait);
172 spin_lock(&iot->async_lock);
175 spin_unlock(&iot->async_lock);
176 Dprintk("iot %p/%p has finished shutdown!\n", iot, current);
178 Dprintk("iot %p/%p waking up master.\n", iot, current);
179 wake_up(&iot->wait_shutdown);
185 static void __stop_cachemiss_threads (iothread_t *iot)
187 DECLARE_WAITQUEUE(wait, current);
189 __set_current_state(TASK_UNINTERRUPTIBLE);
191 Dprintk("stopping async IO threads %p.\n", iot);
192 add_wait_queue(&iot->wait_shutdown, &wait);
194 spin_lock(&iot->async_lock);
200 wake_up_all(&iot->async_sleep);
201 spin_unlock(&iot->async_lock);
203 Dprintk("waiting for async IO threads %p to exit.\n", iot);
205 remove_wait_queue(&iot->wait_shutdown, &wait);
209 if (iot->nr_async_pending)
211 Dprintk("stopped async IO threads %p.\n", iot);
214 void stop_cachemiss_threads (threadinfo_t *ti)
216 iothread_t *iot = ti->iot;
220 if (iot->nr_async_pending)
222 __stop_cachemiss_threads(iot);
227 int start_cachemiss_threads (threadinfo_t *ti)
233 iot = kmalloc(sizeof(*iot), GFP_KERNEL);
236 memset(iot, 0, sizeof(*iot));
239 iot->async_lock = SPIN_LOCK_UNLOCKED;
240 iot->nr_async_pending = 0;
241 INIT_LIST_HEAD(&iot->async_queue);
242 init_waitqueue_head(&iot->async_sleep);
243 init_waitqueue_head(&iot->wait_shutdown);
245 for (i = 0; i < NR_IO_THREADS; i++) {
246 pid = kernel_thread(cachemiss_thread, (void *)iot, 0);
248 printk(KERN_ERR "TUX: error %d creating IO thread!\n",
250 __stop_cachemiss_threads(iot);
257 * Wait for all cachemiss threads to start up:
259 while (iot->threads != NR_IO_THREADS) {
260 __set_current_state(TASK_INTERRUPTIBLE);
261 schedule_timeout(HZ/10);