This commit was manufactured by cvs2svn to create branch 'vserver'.
[linux-2.6.git] / fs / dlm / dir.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25
26
27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28 {
29         spin_lock(&ls->ls_recover_list_lock);
30         list_add(&de->list, &ls->ls_recover_list);
31         spin_unlock(&ls->ls_recover_list_lock);
32 }
33
34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35 {
36         int found = 0;
37         struct dlm_direntry *de;
38
39         spin_lock(&ls->ls_recover_list_lock);
40         list_for_each_entry(de, &ls->ls_recover_list, list) {
41                 if (de->length == len) {
42                         list_del(&de->list);
43                         de->master_nodeid = 0;
44                         memset(de->name, 0, len);
45                         found = 1;
46                         break;
47                 }
48         }
49         spin_unlock(&ls->ls_recover_list_lock);
50
51         if (!found)
52                 de = allocate_direntry(ls, len);
53         return de;
54 }
55
56 void dlm_clear_free_entries(struct dlm_ls *ls)
57 {
58         struct dlm_direntry *de;
59
60         spin_lock(&ls->ls_recover_list_lock);
61         while (!list_empty(&ls->ls_recover_list)) {
62                 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63                                 list);
64                 list_del(&de->list);
65                 free_direntry(de);
66         }
67         spin_unlock(&ls->ls_recover_list_lock);
68 }
69
70 /*
71  * We use the upper 16 bits of the hash value to select the directory node.
72  * Low bits are used for distribution of rsb's among hash buckets on each node.
73  *
74  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75  * num_nodes to the hash value.  This value in the desired range is used as an
76  * offset into the sorted list of nodeid's to give the particular nodeid.
77  */
78
79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80 {
81         struct list_head *tmp;
82         struct dlm_member *memb = NULL;
83         uint32_t node, n = 0;
84         int nodeid;
85
86         if (ls->ls_num_nodes == 1) {
87                 nodeid = dlm_our_nodeid();
88                 goto out;
89         }
90
91         if (ls->ls_node_array) {
92                 node = (hash >> 16) % ls->ls_total_weight;
93                 nodeid = ls->ls_node_array[node];
94                 goto out;
95         }
96
97         /* make_member_array() failed to kmalloc ls_node_array... */
98
99         node = (hash >> 16) % ls->ls_num_nodes;
100
101         list_for_each(tmp, &ls->ls_nodes) {
102                 if (n++ != node)
103                         continue;
104                 memb = list_entry(tmp, struct dlm_member, list);
105                 break;
106         }
107
108         DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109                                  ls->ls_num_nodes, n, node););
110         nodeid = memb->nodeid;
111  out:
112         return nodeid;
113 }
114
115 int dlm_dir_nodeid(struct dlm_rsb *r)
116 {
117         return dlm_hash2nodeid(r->res_ls, r->res_hash);
118 }
119
120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121 {
122         uint32_t val;
123
124         val = jhash(name, len, 0);
125         val &= (ls->ls_dirtbl_size - 1);
126
127         return val;
128 }
129
130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131 {
132         uint32_t bucket;
133
134         bucket = dir_hash(ls, de->name, de->length);
135         list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136 }
137
138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139                                           int namelen, uint32_t bucket)
140 {
141         struct dlm_direntry *de;
142
143         list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144                 if (de->length == namelen && !memcmp(name, de->name, namelen))
145                         goto out;
146         }
147         de = NULL;
148  out:
149         return de;
150 }
151
152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153 {
154         struct dlm_direntry *de;
155         uint32_t bucket;
156
157         bucket = dir_hash(ls, name, namelen);
158
159         write_lock(&ls->ls_dirtbl[bucket].lock);
160
161         de = search_bucket(ls, name, namelen, bucket);
162
163         if (!de) {
164                 log_error(ls, "remove fr %u none", nodeid);
165                 goto out;
166         }
167
168         if (de->master_nodeid != nodeid) {
169                 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170                 goto out;
171         }
172
173         list_del(&de->list);
174         free_direntry(de);
175  out:
176         write_unlock(&ls->ls_dirtbl[bucket].lock);
177 }
178
179 void dlm_dir_clear(struct dlm_ls *ls)
180 {
181         struct list_head *head;
182         struct dlm_direntry *de;
183         int i;
184
185         DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186
187         for (i = 0; i < ls->ls_dirtbl_size; i++) {
188                 write_lock(&ls->ls_dirtbl[i].lock);
189                 head = &ls->ls_dirtbl[i].list;
190                 while (!list_empty(head)) {
191                         de = list_entry(head->next, struct dlm_direntry, list);
192                         list_del(&de->list);
193                         put_free_de(ls, de);
194                 }
195                 write_unlock(&ls->ls_dirtbl[i].lock);
196         }
197 }
198
199 int dlm_recover_directory(struct dlm_ls *ls)
200 {
201         struct dlm_member *memb;
202         struct dlm_direntry *de;
203         char *b, *last_name = NULL;
204         int error = -ENOMEM, last_len, count = 0;
205         uint16_t namelen;
206
207         log_debug(ls, "dlm_recover_directory");
208
209         if (dlm_no_directory(ls))
210                 goto out_status;
211
212         dlm_dir_clear(ls);
213
214         last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_KERNEL);
215         if (!last_name)
216                 goto out;
217
218         list_for_each_entry(memb, &ls->ls_nodes, list) {
219                 memset(last_name, 0, DLM_RESNAME_MAXLEN);
220                 last_len = 0;
221
222                 for (;;) {
223                         error = dlm_recovery_stopped(ls);
224                         if (error)
225                                 goto out_free;
226
227                         error = dlm_rcom_names(ls, memb->nodeid,
228                                                last_name, last_len);
229                         if (error)
230                                 goto out_free;
231
232                         schedule();
233
234                         /*
235                          * pick namelen/name pairs out of received buffer
236                          */
237
238                         b = ls->ls_recover_buf + sizeof(struct dlm_rcom);
239
240                         for (;;) {
241                                 memcpy(&namelen, b, sizeof(uint16_t));
242                                 namelen = be16_to_cpu(namelen);
243                                 b += sizeof(uint16_t);
244
245                                 /* namelen of 0xFFFFF marks end of names for
246                                    this node; namelen of 0 marks end of the
247                                    buffer */
248
249                                 if (namelen == 0xFFFF)
250                                         goto done;
251                                 if (!namelen)
252                                         break;
253
254                                 error = -ENOMEM;
255                                 de = get_free_de(ls, namelen);
256                                 if (!de)
257                                         goto out_free;
258
259                                 de->master_nodeid = memb->nodeid;
260                                 de->length = namelen;
261                                 last_len = namelen;
262                                 memcpy(de->name, b, namelen);
263                                 memcpy(last_name, b, namelen);
264                                 b += namelen;
265
266                                 add_entry_to_hash(ls, de);
267                                 count++;
268                         }
269                 }
270          done:
271                 ;
272         }
273
274  out_status:
275         error = 0;
276         dlm_set_recover_status(ls, DLM_RS_DIR);
277         log_debug(ls, "dlm_recover_directory %d entries", count);
278  out_free:
279         kfree(last_name);
280  out:
281         dlm_clear_free_entries(ls);
282         return error;
283 }
284
285 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
286                      int namelen, int *r_nodeid)
287 {
288         struct dlm_direntry *de, *tmp;
289         uint32_t bucket;
290
291         bucket = dir_hash(ls, name, namelen);
292
293         write_lock(&ls->ls_dirtbl[bucket].lock);
294         de = search_bucket(ls, name, namelen, bucket);
295         if (de) {
296                 *r_nodeid = de->master_nodeid;
297                 write_unlock(&ls->ls_dirtbl[bucket].lock);
298                 if (*r_nodeid == nodeid)
299                         return -EEXIST;
300                 return 0;
301         }
302
303         write_unlock(&ls->ls_dirtbl[bucket].lock);
304
305         de = allocate_direntry(ls, namelen);
306         if (!de)
307                 return -ENOMEM;
308
309         de->master_nodeid = nodeid;
310         de->length = namelen;
311         memcpy(de->name, name, namelen);
312
313         write_lock(&ls->ls_dirtbl[bucket].lock);
314         tmp = search_bucket(ls, name, namelen, bucket);
315         if (tmp) {
316                 free_direntry(de);
317                 de = tmp;
318         } else {
319                 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
320         }
321         *r_nodeid = de->master_nodeid;
322         write_unlock(&ls->ls_dirtbl[bucket].lock);
323         return 0;
324 }
325
326 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
327                    int *r_nodeid)
328 {
329         return get_entry(ls, nodeid, name, namelen, r_nodeid);
330 }
331
332 /* Copy the names of master rsb's into the buffer provided.
333    Only select names whose dir node is the given nodeid. */
334
335 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
336                            char *outbuf, int outlen, int nodeid)
337 {
338         struct list_head *list;
339         struct dlm_rsb *start_r = NULL, *r = NULL;
340         int offset = 0, start_namelen, error, dir_nodeid;
341         char *start_name;
342         uint16_t be_namelen;
343
344         /*
345          * Find the rsb where we left off (or start again)
346          */
347
348         start_namelen = inlen;
349         start_name = inbuf;
350
351         if (start_namelen > 1) {
352                 /*
353                  * We could also use a find_rsb_root() function here that
354                  * searched the ls_root_list.
355                  */
356                 error = dlm_find_rsb(ls, start_name, start_namelen, R_MASTER,
357                                      &start_r);
358                 DLM_ASSERT(!error && start_r,
359                            printk("error %d\n", error););
360                 DLM_ASSERT(!list_empty(&start_r->res_root_list),
361                            dlm_print_rsb(start_r););
362                 dlm_put_rsb(start_r);
363         }
364
365         /*
366          * Send rsb names for rsb's we're master of and whose directory node
367          * matches the requesting node.
368          */
369
370         down_read(&ls->ls_root_sem);
371         if (start_r)
372                 list = start_r->res_root_list.next;
373         else
374                 list = ls->ls_root_list.next;
375
376         for (offset = 0; list != &ls->ls_root_list; list = list->next) {
377                 r = list_entry(list, struct dlm_rsb, res_root_list);
378                 if (r->res_nodeid)
379                         continue;
380
381                 dir_nodeid = dlm_dir_nodeid(r);
382                 if (dir_nodeid != nodeid)
383                         continue;
384
385                 /*
386                  * The block ends when we can't fit the following in the
387                  * remaining buffer space:
388                  * namelen (uint16_t) +
389                  * name (r->res_length) +
390                  * end-of-block record 0x0000 (uint16_t)
391                  */
392
393                 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
394                         /* Write end-of-block record */
395                         be_namelen = 0;
396                         memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
397                         offset += sizeof(uint16_t);
398                         goto out;
399                 }
400
401                 be_namelen = cpu_to_be16(r->res_length);
402                 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
403                 offset += sizeof(uint16_t);
404                 memcpy(outbuf + offset, r->res_name, r->res_length);
405                 offset += r->res_length;
406         }
407
408         /*
409          * If we've reached the end of the list (and there's room) write a
410          * terminating record.
411          */
412
413         if ((list == &ls->ls_root_list) &&
414             (offset + sizeof(uint16_t) <= outlen)) {
415                 be_namelen = 0xFFFF;
416                 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
417                 offset += sizeof(uint16_t);
418         }
419
420  out:
421         up_read(&ls->ls_root_sem);
422 }
423