Merge to Fedora kernel-2.6.18-1.2224_FC5 patched with stable patch-2.6.18.1-vs2.0...
[linux-2.6.git] / net / tux / cachemiss.c
1 /*
2  * TUX - Integrated Application Protocols Layer and Object Cache
3  *
4  * Copyright (C) 2000, 2001, Ingo Molnar <mingo@redhat.com>
5  *
6  * cachemiss.c: handle the 'slow IO path' by queueing not-yet-cached
7  * requests to the IO-thread pool. Dynamic load balancing is done
8  * between IO threads, based on the number of requests they have pending.
9  */
10
11 #include <net/tux.h>
12 #include <linux/delay.h>
13
14 /****************************************************************
15  *      This program is free software; you can redistribute it and/or modify
16  *      it under the terms of the GNU General Public License as published by
17  *      the Free Software Foundation; either version 2, or (at your option)
18  *      any later version.
19  *
20  *      This program is distributed in the hope that it will be useful,
21  *      but WITHOUT ANY WARRANTY; without even the implied warranty of
22  *      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *      GNU General Public License for more details.
24  *
25  *      You should have received a copy of the GNU General Public License
26  *      along with this program; if not, write to the Free Software
27  *      Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
28  *
29  ****************************************************************/
30
31 void queue_cachemiss (tux_req_t *req)
32 {
33         iothread_t *iot = req->ti->iot;
34
35         Dprintk("queueing_cachemiss(req:%p) (req->cwd_dentry: %p) at %p:%p.\n",
36                 req, req->cwd_dentry, __builtin_return_address(0), __builtin_return_address(1));
37         if (req->idle_input || req->wait_output_space)
38                 TUX_BUG();
39         req->had_cachemiss = 1;
40         if (!list_empty(&req->work))
41                 TUX_BUG();
42         spin_lock(&iot->async_lock);
43         if (connection_too_fast(req))
44                 list_add_tail(&req->work, &iot->async_queue);
45         else
46                 list_add(&req->work, &iot->async_queue);
47         iot->nr_async_pending++;
48         INC_STAT(nr_cachemiss_pending);
49         spin_unlock(&iot->async_lock);
50
51         wake_up(&iot->async_sleep);
52 }
53
54 static tux_req_t * get_cachemiss (iothread_t *iot)
55 {
56         struct list_head *tmp;
57         tux_req_t *req = NULL;
58
59         spin_lock(&iot->async_lock);
60         if (!list_empty(&iot->async_queue)) {
61
62                 tmp = iot->async_queue.next;
63                 req = list_entry(tmp, tux_req_t, work);
64
65                 Dprintk("get_cachemiss(%p): got req %p.\n", iot, req);
66                 list_del(tmp);
67                 DEBUG_DEL_LIST(tmp);
68                 iot->nr_async_pending--;
69                 DEC_STAT(nr_cachemiss_pending);
70
71                 if (req->ti->iot != iot)
72                         TUX_BUG();
73         }
74         spin_unlock(&iot->async_lock);
75         return req;
76 }
77
78 struct file * tux_open_file (char *filename, int mode)
79 {
80         struct file *filp;
81
82         if (!filename)
83                 TUX_BUG();
84
85         /* Rule no. 3 -- Does the file exist ? */
86
87         filp = filp_open(filename, mode, 0600);
88
89         if (IS_ERR(filp) || !filp || !filp->f_dentry)
90                 goto err;
91
92 out:
93         return filp;
94 err:
95         Dprintk("filp_open() error: %d.\n", (int)filp);
96         filp = NULL;
97         goto out;
98 }
99
100 static int cachemiss_thread (void *data)
101 {
102         tux_req_t *req;
103         struct k_sigaction *ka;
104         DECLARE_WAITQUEUE(wait, current);
105         iothread_t *iot = data;
106         int nr = iot->ti->cpu, wake_up;
107
108         Dprintk("iot %p/%p got started.\n", iot, current);
109         drop_permissions();
110
111         spin_lock(&iot->async_lock);
112         iot->threads++;
113         sprintf(current->comm, "async IO %d/%d", nr, iot->threads);
114
115
116         spin_lock_irq(&current->sighand->siglock);
117         ka = current->sighand->action + SIGCHLD-1;
118         ka->sa.sa_handler = SIG_IGN;
119         siginitsetinv(&current->blocked, sigmask(SIGCHLD));
120         recalc_sigpending();
121         spin_unlock_irq(&current->sighand->siglock);
122
123         spin_unlock(&iot->async_lock);
124 #ifdef CONFIG_SMP
125         {
126                 cpumask_t mask;
127
128                 if (cpu_isset(nr, cpu_online_map)) {
129                         cpus_clear(mask);
130                         cpu_set(nr, mask);
131                         set_cpus_allowed(current, mask);
132                 }
133
134         }
135 #endif
136
137         add_wait_queue_exclusive(&iot->async_sleep, &wait);
138
139         for (;;) {
140                 while (!list_empty(&iot->async_queue) &&
141                                 (req = get_cachemiss(iot))) {
142
143                         if (!req->atom_idx) {
144                                 add_tux_atom(req, flush_request);
145                                 add_req_to_workqueue(req);
146                                 continue;
147                         }
148                         tux_schedule_atom(req, 1);
149                         if (signal_pending(current))
150                                 flush_all_signals();
151                 }
152                 if (signal_pending(current))
153                         flush_all_signals();
154                 if (!list_empty(&iot->async_queue))
155                         continue;
156                 if (iot->shutdown) {
157                         Dprintk("iot %p/%p got shutdown!\n", iot, current);
158                         break;
159                 }
160                 __set_current_state(TASK_INTERRUPTIBLE);
161                 if (list_empty(&iot->async_queue)) {
162                         Dprintk("iot %p/%p going to sleep.\n", iot, current);
163                         schedule();
164                         Dprintk("iot %p/%p got woken up.\n", iot, current);
165                 }
166                 __set_current_state(TASK_RUNNING);
167         }
168
169         remove_wait_queue(&iot->async_sleep, &wait);
170
171         wake_up = 0;
172         spin_lock(&iot->async_lock);
173         if (!--iot->threads)
174                 wake_up = 1;
175         spin_unlock(&iot->async_lock);
176         Dprintk("iot %p/%p has finished shutdown!\n", iot, current);
177         if (wake_up) {
178                 Dprintk("iot %p/%p waking up master.\n", iot, current);
179                 wake_up(&iot->wait_shutdown);
180         }
181
182         return 0;
183 }
184
185 static void __stop_cachemiss_threads (iothread_t *iot)
186 {
187         DECLARE_WAITQUEUE(wait, current);
188
189         __set_current_state(TASK_UNINTERRUPTIBLE);
190
191         Dprintk("stopping async IO threads %p.\n", iot);
192         add_wait_queue(&iot->wait_shutdown, &wait);
193
194         spin_lock(&iot->async_lock);
195         if (iot->shutdown)
196                 TUX_BUG();
197         if (!iot->threads)
198                 TUX_BUG();
199         iot->shutdown = 1;
200         wake_up_all(&iot->async_sleep);
201         spin_unlock(&iot->async_lock);
202
203         Dprintk("waiting for async IO threads %p to exit.\n", iot);
204         schedule();
205         remove_wait_queue(&iot->wait_shutdown, &wait);
206
207         if (iot->threads)
208                 TUX_BUG();
209         if (iot->nr_async_pending)
210                 TUX_BUG();
211         Dprintk("stopped async IO threads %p.\n", iot);
212 }
213
214 void stop_cachemiss_threads (threadinfo_t *ti)
215 {
216         iothread_t *iot = ti->iot;
217
218         if (!iot)
219                 TUX_BUG();
220         if (iot->nr_async_pending)
221                 TUX_BUG();
222         __stop_cachemiss_threads(iot);
223         ti->iot = NULL;
224         kfree(iot);
225 }
226
227 int start_cachemiss_threads (threadinfo_t *ti)
228 {
229         int i, pid;
230
231         iothread_t *iot;
232
233         iot = kmalloc(sizeof(*iot), GFP_KERNEL);
234         if (!iot)
235                 return -ENOMEM;
236         memset(iot, 0, sizeof(*iot));
237
238         iot->ti = ti;
239         spin_lock_init(&iot->async_lock);
240         iot->nr_async_pending = 0;
241         INIT_LIST_HEAD(&iot->async_queue);
242         init_waitqueue_head(&iot->async_sleep);
243         init_waitqueue_head(&iot->wait_shutdown);
244
245         for (i = 0; i < NR_IO_THREADS; i++) {
246                 pid = kernel_thread(cachemiss_thread, (void *)iot, 0);
247                 if (pid < 0) {
248                         printk(KERN_ERR "TUX: error %d creating IO thread!\n",
249                                         pid);
250                         __stop_cachemiss_threads(iot);
251                         kfree(iot);
252                         return pid;
253                 }
254         }
255         ti->iot = iot;
256         /*
257          * Wait for all cachemiss threads to start up:
258          */
259         while (iot->threads != NR_IO_THREADS) {
260                 __set_current_state(TASK_INTERRUPTIBLE);
261                 schedule_timeout(HZ/10);
262         }
263         return 0;
264 }
265