This commit was manufactured by cvs2svn to create branch 'vserver'.
[linux-2.6.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86                                     struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88
89 /*
90  * Lock compatibilty matrix - thanks Steve
91  * UN = Unlocked state. Not really a state, used as a flag
92  * PD = Padding. Used to make the matrix a nice power of two in size
93  * Other states are the same as the VMS DLM.
94  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
95  */
96
97 static const int __dlm_compat_matrix[8][8] = {
98       /* UN NL CR CW PR PW EX PD */
99         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
100         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
101         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
102         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
103         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
104         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
105         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
106         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
107 };
108
109 /*
110  * This defines the direction of transfer of LVB data.
111  * Granted mode is the row; requested mode is the column.
112  * Usage: matrix[grmode+1][rqmode+1]
113  * 1 = LVB is returned to the caller
114  * 0 = LVB is written to the resource
115  * -1 = nothing happens to the LVB
116  */
117
118 const int dlm_lvb_operations[8][8] = {
119         /* UN   NL  CR  CW  PR  PW  EX  PD*/
120         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
121         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
122         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
123         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
124         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
125         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
126         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
127         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
128 };
129
130 #define modes_compat(gr, rq) \
131         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
132
133 int dlm_modes_compat(int mode1, int mode2)
134 {
135         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
136 }
137
138 /*
139  * Compatibility matrix for conversions with QUECVT set.
140  * Granted mode is the row; requested mode is the column.
141  * Usage: matrix[grmode+1][rqmode+1]
142  */
143
144 static const int __quecvt_compat_matrix[8][8] = {
145       /* UN NL CR CW PR PW EX PD */
146         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
147         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
148         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
149         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
150         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
151         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
153         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
154 };
155
156 void dlm_print_lkb(struct dlm_lkb *lkb)
157 {
158         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
159                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
160                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
161                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
162                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
163 }
164
165 void dlm_print_rsb(struct dlm_rsb *r)
166 {
167         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
168                r->res_nodeid, r->res_flags, r->res_first_lkid,
169                r->res_recover_locks_count, r->res_name);
170 }
171
172 void dlm_dump_rsb(struct dlm_rsb *r)
173 {
174         struct dlm_lkb *lkb;
175
176         dlm_print_rsb(r);
177
178         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
179                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
180         printk(KERN_ERR "rsb lookup list\n");
181         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
182                 dlm_print_lkb(lkb);
183         printk(KERN_ERR "rsb grant queue:\n");
184         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb convert queue:\n");
187         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb wait queue:\n");
190         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192 }
193
194 /* Threads cannot use the lockspace while it's being recovered */
195
196 static inline void lock_recovery(struct dlm_ls *ls)
197 {
198         down_read(&ls->ls_in_recovery);
199 }
200
201 static inline void unlock_recovery(struct dlm_ls *ls)
202 {
203         up_read(&ls->ls_in_recovery);
204 }
205
206 static inline int lock_recovery_try(struct dlm_ls *ls)
207 {
208         return down_read_trylock(&ls->ls_in_recovery);
209 }
210
211 static inline int can_be_queued(struct dlm_lkb *lkb)
212 {
213         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
214 }
215
216 static inline int force_blocking_asts(struct dlm_lkb *lkb)
217 {
218         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
219 }
220
221 static inline int is_demoted(struct dlm_lkb *lkb)
222 {
223         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
224 }
225
226 static inline int is_remote(struct dlm_rsb *r)
227 {
228         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
229         return !!r->res_nodeid;
230 }
231
232 static inline int is_process_copy(struct dlm_lkb *lkb)
233 {
234         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
235 }
236
237 static inline int is_master_copy(struct dlm_lkb *lkb)
238 {
239         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
240                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
241         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
242 }
243
244 static inline int middle_conversion(struct dlm_lkb *lkb)
245 {
246         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
247             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
248                 return 1;
249         return 0;
250 }
251
252 static inline int down_conversion(struct dlm_lkb *lkb)
253 {
254         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
255 }
256
257 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
258 {
259         if (is_master_copy(lkb))
260                 return;
261
262         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
263
264         lkb->lkb_lksb->sb_status = rv;
265         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
266
267         dlm_add_ast(lkb, AST_COMP);
268 }
269
270 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
271 {
272         if (is_master_copy(lkb))
273                 send_bast(r, lkb, rqmode);
274         else {
275                 lkb->lkb_bastmode = rqmode;
276                 dlm_add_ast(lkb, AST_BAST);
277         }
278 }
279
280 /*
281  * Basic operations on rsb's and lkb's
282  */
283
284 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
285 {
286         struct dlm_rsb *r;
287
288         r = allocate_rsb(ls, len);
289         if (!r)
290                 return NULL;
291
292         r->res_ls = ls;
293         r->res_length = len;
294         memcpy(r->res_name, name, len);
295         mutex_init(&r->res_mutex);
296
297         INIT_LIST_HEAD(&r->res_lookup);
298         INIT_LIST_HEAD(&r->res_grantqueue);
299         INIT_LIST_HEAD(&r->res_convertqueue);
300         INIT_LIST_HEAD(&r->res_waitqueue);
301         INIT_LIST_HEAD(&r->res_root_list);
302         INIT_LIST_HEAD(&r->res_recover_list);
303
304         return r;
305 }
306
307 static int search_rsb_list(struct list_head *head, char *name, int len,
308                            unsigned int flags, struct dlm_rsb **r_ret)
309 {
310         struct dlm_rsb *r;
311         int error = 0;
312
313         list_for_each_entry(r, head, res_hashchain) {
314                 if (len == r->res_length && !memcmp(name, r->res_name, len))
315                         goto found;
316         }
317         return -EBADR;
318
319  found:
320         if (r->res_nodeid && (flags & R_MASTER))
321                 error = -ENOTBLK;
322         *r_ret = r;
323         return error;
324 }
325
326 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
327                        unsigned int flags, struct dlm_rsb **r_ret)
328 {
329         struct dlm_rsb *r;
330         int error;
331
332         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
333         if (!error) {
334                 kref_get(&r->res_ref);
335                 goto out;
336         }
337         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
338         if (error)
339                 goto out;
340
341         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
342
343         if (dlm_no_directory(ls))
344                 goto out;
345
346         if (r->res_nodeid == -1) {
347                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
348                 r->res_first_lkid = 0;
349         } else if (r->res_nodeid > 0) {
350                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
351                 r->res_first_lkid = 0;
352         } else {
353                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
354                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
355         }
356  out:
357         *r_ret = r;
358         return error;
359 }
360
361 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
362                       unsigned int flags, struct dlm_rsb **r_ret)
363 {
364         int error;
365         write_lock(&ls->ls_rsbtbl[b].lock);
366         error = _search_rsb(ls, name, len, b, flags, r_ret);
367         write_unlock(&ls->ls_rsbtbl[b].lock);
368         return error;
369 }
370
371 /*
372  * Find rsb in rsbtbl and potentially create/add one
373  *
374  * Delaying the release of rsb's has a similar benefit to applications keeping
375  * NL locks on an rsb, but without the guarantee that the cached master value
376  * will still be valid when the rsb is reused.  Apps aren't always smart enough
377  * to keep NL locks on an rsb that they may lock again shortly; this can lead
378  * to excessive master lookups and removals if we don't delay the release.
379  *
380  * Searching for an rsb means looking through both the normal list and toss
381  * list.  When found on the toss list the rsb is moved to the normal list with
382  * ref count of 1; when found on normal list the ref count is incremented.
383  */
384
385 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
386                     unsigned int flags, struct dlm_rsb **r_ret)
387 {
388         struct dlm_rsb *r, *tmp;
389         uint32_t hash, bucket;
390         int error = 0;
391
392         if (dlm_no_directory(ls))
393                 flags |= R_CREATE;
394
395         hash = jhash(name, namelen, 0);
396         bucket = hash & (ls->ls_rsbtbl_size - 1);
397
398         error = search_rsb(ls, name, namelen, bucket, flags, &r);
399         if (!error)
400                 goto out;
401
402         if (error == -EBADR && !(flags & R_CREATE))
403                 goto out;
404
405         /* the rsb was found but wasn't a master copy */
406         if (error == -ENOTBLK)
407                 goto out;
408
409         error = -ENOMEM;
410         r = create_rsb(ls, name, namelen);
411         if (!r)
412                 goto out;
413
414         r->res_hash = hash;
415         r->res_bucket = bucket;
416         r->res_nodeid = -1;
417         kref_init(&r->res_ref);
418
419         /* With no directory, the master can be set immediately */
420         if (dlm_no_directory(ls)) {
421                 int nodeid = dlm_dir_nodeid(r);
422                 if (nodeid == dlm_our_nodeid())
423                         nodeid = 0;
424                 r->res_nodeid = nodeid;
425         }
426
427         write_lock(&ls->ls_rsbtbl[bucket].lock);
428         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
429         if (!error) {
430                 write_unlock(&ls->ls_rsbtbl[bucket].lock);
431                 free_rsb(r);
432                 r = tmp;
433                 goto out;
434         }
435         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
436         write_unlock(&ls->ls_rsbtbl[bucket].lock);
437         error = 0;
438  out:
439         *r_ret = r;
440         return error;
441 }
442
443 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
444                  unsigned int flags, struct dlm_rsb **r_ret)
445 {
446         return find_rsb(ls, name, namelen, flags, r_ret);
447 }
448
449 /* This is only called to add a reference when the code already holds
450    a valid reference to the rsb, so there's no need for locking. */
451
452 static inline void hold_rsb(struct dlm_rsb *r)
453 {
454         kref_get(&r->res_ref);
455 }
456
457 void dlm_hold_rsb(struct dlm_rsb *r)
458 {
459         hold_rsb(r);
460 }
461
462 static void toss_rsb(struct kref *kref)
463 {
464         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
465         struct dlm_ls *ls = r->res_ls;
466
467         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
468         kref_init(&r->res_ref);
469         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
470         r->res_toss_time = jiffies;
471         if (r->res_lvbptr) {
472                 free_lvb(r->res_lvbptr);
473                 r->res_lvbptr = NULL;
474         }
475 }
476
477 /* When all references to the rsb are gone it's transfered to
478    the tossed list for later disposal. */
479
480 static void put_rsb(struct dlm_rsb *r)
481 {
482         struct dlm_ls *ls = r->res_ls;
483         uint32_t bucket = r->res_bucket;
484
485         write_lock(&ls->ls_rsbtbl[bucket].lock);
486         kref_put(&r->res_ref, toss_rsb);
487         write_unlock(&ls->ls_rsbtbl[bucket].lock);
488 }
489
490 void dlm_put_rsb(struct dlm_rsb *r)
491 {
492         put_rsb(r);
493 }
494
495 /* See comment for unhold_lkb */
496
497 static void unhold_rsb(struct dlm_rsb *r)
498 {
499         int rv;
500         rv = kref_put(&r->res_ref, toss_rsb);
501         DLM_ASSERT(!rv, dlm_dump_rsb(r););
502 }
503
504 static void kill_rsb(struct kref *kref)
505 {
506         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
507
508         /* All work is done after the return from kref_put() so we
509            can release the write_lock before the remove and free. */
510
511         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
512         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
513         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
514         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
515         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
516         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
517 }
518
519 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
520    The rsb must exist as long as any lkb's for it do. */
521
522 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
523 {
524         hold_rsb(r);
525         lkb->lkb_resource = r;
526 }
527
528 static void detach_lkb(struct dlm_lkb *lkb)
529 {
530         if (lkb->lkb_resource) {
531                 put_rsb(lkb->lkb_resource);
532                 lkb->lkb_resource = NULL;
533         }
534 }
535
536 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
537 {
538         struct dlm_lkb *lkb, *tmp;
539         uint32_t lkid = 0;
540         uint16_t bucket;
541
542         lkb = allocate_lkb(ls);
543         if (!lkb)
544                 return -ENOMEM;
545
546         lkb->lkb_nodeid = -1;
547         lkb->lkb_grmode = DLM_LOCK_IV;
548         kref_init(&lkb->lkb_ref);
549         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
550
551         get_random_bytes(&bucket, sizeof(bucket));
552         bucket &= (ls->ls_lkbtbl_size - 1);
553
554         write_lock(&ls->ls_lkbtbl[bucket].lock);
555
556         /* counter can roll over so we must verify lkid is not in use */
557
558         while (lkid == 0) {
559                 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
560
561                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
562                                     lkb_idtbl_list) {
563                         if (tmp->lkb_id != lkid)
564                                 continue;
565                         lkid = 0;
566                         break;
567                 }
568         }
569
570         lkb->lkb_id = lkid;
571         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
572         write_unlock(&ls->ls_lkbtbl[bucket].lock);
573
574         *lkb_ret = lkb;
575         return 0;
576 }
577
578 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
579 {
580         uint16_t bucket = lkid & 0xFFFF;
581         struct dlm_lkb *lkb;
582
583         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
584                 if (lkb->lkb_id == lkid)
585                         return lkb;
586         }
587         return NULL;
588 }
589
590 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
591 {
592         struct dlm_lkb *lkb;
593         uint16_t bucket = lkid & 0xFFFF;
594
595         if (bucket >= ls->ls_lkbtbl_size)
596                 return -EBADSLT;
597
598         read_lock(&ls->ls_lkbtbl[bucket].lock);
599         lkb = __find_lkb(ls, lkid);
600         if (lkb)
601                 kref_get(&lkb->lkb_ref);
602         read_unlock(&ls->ls_lkbtbl[bucket].lock);
603
604         *lkb_ret = lkb;
605         return lkb ? 0 : -ENOENT;
606 }
607
608 static void kill_lkb(struct kref *kref)
609 {
610         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
611
612         /* All work is done after the return from kref_put() so we
613            can release the write_lock before the detach_lkb */
614
615         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
616 }
617
618 /* __put_lkb() is used when an lkb may not have an rsb attached to
619    it so we need to provide the lockspace explicitly */
620
621 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
622 {
623         uint16_t bucket = lkb->lkb_id & 0xFFFF;
624
625         write_lock(&ls->ls_lkbtbl[bucket].lock);
626         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
627                 list_del(&lkb->lkb_idtbl_list);
628                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
629
630                 detach_lkb(lkb);
631
632                 /* for local/process lkbs, lvbptr points to caller's lksb */
633                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
634                         free_lvb(lkb->lkb_lvbptr);
635                 free_lkb(lkb);
636                 return 1;
637         } else {
638                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
639                 return 0;
640         }
641 }
642
643 int dlm_put_lkb(struct dlm_lkb *lkb)
644 {
645         struct dlm_ls *ls;
646
647         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
648         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
649
650         ls = lkb->lkb_resource->res_ls;
651         return __put_lkb(ls, lkb);
652 }
653
654 /* This is only called to add a reference when the code already holds
655    a valid reference to the lkb, so there's no need for locking. */
656
657 static inline void hold_lkb(struct dlm_lkb *lkb)
658 {
659         kref_get(&lkb->lkb_ref);
660 }
661
662 /* This is called when we need to remove a reference and are certain
663    it's not the last ref.  e.g. del_lkb is always called between a
664    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
665    put_lkb would work fine, but would involve unnecessary locking */
666
667 static inline void unhold_lkb(struct dlm_lkb *lkb)
668 {
669         int rv;
670         rv = kref_put(&lkb->lkb_ref, kill_lkb);
671         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
672 }
673
674 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
675                             int mode)
676 {
677         struct dlm_lkb *lkb = NULL;
678
679         list_for_each_entry(lkb, head, lkb_statequeue)
680                 if (lkb->lkb_rqmode < mode)
681                         break;
682
683         if (!lkb)
684                 list_add_tail(new, head);
685         else
686                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
687 }
688
689 /* add/remove lkb to rsb's grant/convert/wait queue */
690
691 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
692 {
693         kref_get(&lkb->lkb_ref);
694
695         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
696
697         lkb->lkb_status = status;
698
699         switch (status) {
700         case DLM_LKSTS_WAITING:
701                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
702                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
703                 else
704                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
705                 break;
706         case DLM_LKSTS_GRANTED:
707                 /* convention says granted locks kept in order of grmode */
708                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
709                                 lkb->lkb_grmode);
710                 break;
711         case DLM_LKSTS_CONVERT:
712                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
713                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
714                 else
715                         list_add_tail(&lkb->lkb_statequeue,
716                                       &r->res_convertqueue);
717                 break;
718         default:
719                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
720         }
721 }
722
723 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
724 {
725         lkb->lkb_status = 0;
726         list_del(&lkb->lkb_statequeue);
727         unhold_lkb(lkb);
728 }
729
730 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
731 {
732         hold_lkb(lkb);
733         del_lkb(r, lkb);
734         add_lkb(r, lkb, sts);
735         unhold_lkb(lkb);
736 }
737
738 /* add/remove lkb from global waiters list of lkb's waiting for
739    a reply from a remote node */
740
741 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
742 {
743         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
744
745         mutex_lock(&ls->ls_waiters_mutex);
746         if (lkb->lkb_wait_type) {
747                 log_print("add_to_waiters error %d", lkb->lkb_wait_type);
748                 goto out;
749         }
750         lkb->lkb_wait_type = mstype;
751         kref_get(&lkb->lkb_ref);
752         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
753  out:
754         mutex_unlock(&ls->ls_waiters_mutex);
755 }
756
757 /* We clear the RESEND flag because we might be taking an lkb off the waiters
758    list as part of process_requestqueue (e.g. a lookup that has an optimized
759    request reply on the requestqueue) between dlm_recover_waiters_pre() which
760    set RESEND and dlm_recover_waiters_post() */
761
762 static int _remove_from_waiters(struct dlm_lkb *lkb)
763 {
764         int error = 0;
765
766         if (!lkb->lkb_wait_type) {
767                 log_print("remove_from_waiters error");
768                 error = -EINVAL;
769                 goto out;
770         }
771         lkb->lkb_wait_type = 0;
772         lkb->lkb_flags &= ~DLM_IFL_RESEND;
773         list_del(&lkb->lkb_wait_reply);
774         unhold_lkb(lkb);
775  out:
776         return error;
777 }
778
779 static int remove_from_waiters(struct dlm_lkb *lkb)
780 {
781         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
782         int error;
783
784         mutex_lock(&ls->ls_waiters_mutex);
785         error = _remove_from_waiters(lkb);
786         mutex_unlock(&ls->ls_waiters_mutex);
787         return error;
788 }
789
790 static void dir_remove(struct dlm_rsb *r)
791 {
792         int to_nodeid;
793
794         if (dlm_no_directory(r->res_ls))
795                 return;
796
797         to_nodeid = dlm_dir_nodeid(r);
798         if (to_nodeid != dlm_our_nodeid())
799                 send_remove(r);
800         else
801                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
802                                      r->res_name, r->res_length);
803 }
804
805 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
806    found since they are in order of newest to oldest? */
807
808 static int shrink_bucket(struct dlm_ls *ls, int b)
809 {
810         struct dlm_rsb *r;
811         int count = 0, found;
812
813         for (;;) {
814                 found = 0;
815                 write_lock(&ls->ls_rsbtbl[b].lock);
816                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
817                                             res_hashchain) {
818                         if (!time_after_eq(jiffies, r->res_toss_time +
819                                            dlm_config.ci_toss_secs * HZ))
820                                 continue;
821                         found = 1;
822                         break;
823                 }
824
825                 if (!found) {
826                         write_unlock(&ls->ls_rsbtbl[b].lock);
827                         break;
828                 }
829
830                 if (kref_put(&r->res_ref, kill_rsb)) {
831                         list_del(&r->res_hashchain);
832                         write_unlock(&ls->ls_rsbtbl[b].lock);
833
834                         if (is_master(r))
835                                 dir_remove(r);
836                         free_rsb(r);
837                         count++;
838                 } else {
839                         write_unlock(&ls->ls_rsbtbl[b].lock);
840                         log_error(ls, "tossed rsb in use %s", r->res_name);
841                 }
842         }
843
844         return count;
845 }
846
847 void dlm_scan_rsbs(struct dlm_ls *ls)
848 {
849         int i;
850
851         if (dlm_locking_stopped(ls))
852                 return;
853
854         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
855                 shrink_bucket(ls, i);
856                 cond_resched();
857         }
858 }
859
860 /* lkb is master or local copy */
861
862 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
863 {
864         int b, len = r->res_ls->ls_lvblen;
865
866         /* b=1 lvb returned to caller
867            b=0 lvb written to rsb or invalidated
868            b=-1 do nothing */
869
870         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
871
872         if (b == 1) {
873                 if (!lkb->lkb_lvbptr)
874                         return;
875
876                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
877                         return;
878
879                 if (!r->res_lvbptr)
880                         return;
881
882                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
883                 lkb->lkb_lvbseq = r->res_lvbseq;
884
885         } else if (b == 0) {
886                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
887                         rsb_set_flag(r, RSB_VALNOTVALID);
888                         return;
889                 }
890
891                 if (!lkb->lkb_lvbptr)
892                         return;
893
894                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
895                         return;
896
897                 if (!r->res_lvbptr)
898                         r->res_lvbptr = allocate_lvb(r->res_ls);
899
900                 if (!r->res_lvbptr)
901                         return;
902
903                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
904                 r->res_lvbseq++;
905                 lkb->lkb_lvbseq = r->res_lvbseq;
906                 rsb_clear_flag(r, RSB_VALNOTVALID);
907         }
908
909         if (rsb_flag(r, RSB_VALNOTVALID))
910                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
911 }
912
913 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
914 {
915         if (lkb->lkb_grmode < DLM_LOCK_PW)
916                 return;
917
918         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
919                 rsb_set_flag(r, RSB_VALNOTVALID);
920                 return;
921         }
922
923         if (!lkb->lkb_lvbptr)
924                 return;
925
926         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
927                 return;
928
929         if (!r->res_lvbptr)
930                 r->res_lvbptr = allocate_lvb(r->res_ls);
931
932         if (!r->res_lvbptr)
933                 return;
934
935         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
936         r->res_lvbseq++;
937         rsb_clear_flag(r, RSB_VALNOTVALID);
938 }
939
940 /* lkb is process copy (pc) */
941
942 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
943                             struct dlm_message *ms)
944 {
945         int b;
946
947         if (!lkb->lkb_lvbptr)
948                 return;
949
950         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
951                 return;
952
953         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
954         if (b == 1) {
955                 int len = receive_extralen(ms);
956                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
957                 lkb->lkb_lvbseq = ms->m_lvbseq;
958         }
959 }
960
961 /* Manipulate lkb's on rsb's convert/granted/waiting queues
962    remove_lock -- used for unlock, removes lkb from granted
963    revert_lock -- used for cancel, moves lkb from convert to granted
964    grant_lock  -- used for request and convert, adds lkb to granted or
965                   moves lkb from convert or waiting to granted
966
967    Each of these is used for master or local copy lkb's.  There is
968    also a _pc() variation used to make the corresponding change on
969    a process copy (pc) lkb. */
970
971 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
972 {
973         del_lkb(r, lkb);
974         lkb->lkb_grmode = DLM_LOCK_IV;
975         /* this unhold undoes the original ref from create_lkb()
976            so this leads to the lkb being freed */
977         unhold_lkb(lkb);
978 }
979
980 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
981 {
982         set_lvb_unlock(r, lkb);
983         _remove_lock(r, lkb);
984 }
985
986 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
987 {
988         _remove_lock(r, lkb);
989 }
990
991 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
992 {
993         lkb->lkb_rqmode = DLM_LOCK_IV;
994
995         switch (lkb->lkb_status) {
996         case DLM_LKSTS_GRANTED:
997                 break;
998         case DLM_LKSTS_CONVERT:
999                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1000                 break;
1001         case DLM_LKSTS_WAITING:
1002                 del_lkb(r, lkb);
1003                 lkb->lkb_grmode = DLM_LOCK_IV;
1004                 /* this unhold undoes the original ref from create_lkb()
1005                    so this leads to the lkb being freed */
1006                 unhold_lkb(lkb);
1007                 break;
1008         default:
1009                 log_print("invalid status for revert %d", lkb->lkb_status);
1010         }
1011 }
1012
1013 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1014 {
1015         revert_lock(r, lkb);
1016 }
1017
1018 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1019 {
1020         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1021                 lkb->lkb_grmode = lkb->lkb_rqmode;
1022                 if (lkb->lkb_status)
1023                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1024                 else
1025                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1026         }
1027
1028         lkb->lkb_rqmode = DLM_LOCK_IV;
1029 }
1030
1031 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1032 {
1033         set_lvb_lock(r, lkb);
1034         _grant_lock(r, lkb);
1035         lkb->lkb_highbast = 0;
1036 }
1037
1038 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1039                           struct dlm_message *ms)
1040 {
1041         set_lvb_lock_pc(r, lkb, ms);
1042         _grant_lock(r, lkb);
1043 }
1044
1045 /* called by grant_pending_locks() which means an async grant message must
1046    be sent to the requesting node in addition to granting the lock if the
1047    lkb belongs to a remote node. */
1048
1049 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1050 {
1051         grant_lock(r, lkb);
1052         if (is_master_copy(lkb))
1053                 send_grant(r, lkb);
1054         else
1055                 queue_cast(r, lkb, 0);
1056 }
1057
1058 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1059 {
1060         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1061                                            lkb_statequeue);
1062         if (lkb->lkb_id == first->lkb_id)
1063                 return 1;
1064
1065         return 0;
1066 }
1067
1068 /* Check if the given lkb conflicts with another lkb on the queue. */
1069
1070 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1071 {
1072         struct dlm_lkb *this;
1073
1074         list_for_each_entry(this, head, lkb_statequeue) {
1075                 if (this == lkb)
1076                         continue;
1077                 if (!modes_compat(this, lkb))
1078                         return 1;
1079         }
1080         return 0;
1081 }
1082
1083 /*
1084  * "A conversion deadlock arises with a pair of lock requests in the converting
1085  * queue for one resource.  The granted mode of each lock blocks the requested
1086  * mode of the other lock."
1087  *
1088  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1089  * convert queue from being granted, then demote lkb (set grmode to NL).
1090  * This second form requires that we check for conv-deadlk even when
1091  * now == 0 in _can_be_granted().
1092  *
1093  * Example:
1094  * Granted Queue: empty
1095  * Convert Queue: NL->EX (first lock)
1096  *                PR->EX (second lock)
1097  *
1098  * The first lock can't be granted because of the granted mode of the second
1099  * lock and the second lock can't be granted because it's not first in the
1100  * list.  We demote the granted mode of the second lock (the lkb passed to this
1101  * function).
1102  *
1103  * After the resolution, the "grant pending" function needs to go back and try
1104  * to grant locks on the convert queue again since the first lock can now be
1105  * granted.
1106  */
1107
1108 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1109 {
1110         struct dlm_lkb *this, *first = NULL, *self = NULL;
1111
1112         list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1113                 if (!first)
1114                         first = this;
1115                 if (this == lkb) {
1116                         self = lkb;
1117                         continue;
1118                 }
1119
1120                 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1121                         return 1;
1122         }
1123
1124         /* if lkb is on the convert queue and is preventing the first
1125            from being granted, then there's deadlock and we demote lkb.
1126            multiple converting locks may need to do this before the first
1127            converting lock can be granted. */
1128
1129         if (self && self != first) {
1130                 if (!modes_compat(lkb, first) &&
1131                     !queue_conflict(&rsb->res_grantqueue, first))
1132                         return 1;
1133         }
1134
1135         return 0;
1136 }
1137
1138 /*
1139  * Return 1 if the lock can be granted, 0 otherwise.
1140  * Also detect and resolve conversion deadlocks.
1141  *
1142  * lkb is the lock to be granted
1143  *
1144  * now is 1 if the function is being called in the context of the
1145  * immediate request, it is 0 if called later, after the lock has been
1146  * queued.
1147  *
1148  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1149  */
1150
1151 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1152 {
1153         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1154
1155         /*
1156          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1157          * a new request for a NL mode lock being blocked.
1158          *
1159          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1160          * request, then it would be granted.  In essence, the use of this flag
1161          * tells the Lock Manager to expedite theis request by not considering
1162          * what may be in the CONVERTING or WAITING queues...  As of this
1163          * writing, the EXPEDITE flag can be used only with new requests for NL
1164          * mode locks.  This flag is not valid for conversion requests.
1165          *
1166          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1167          * conversion or used with a non-NL requested mode.  We also know an
1168          * EXPEDITE request is always granted immediately, so now must always
1169          * be 1.  The full condition to grant an expedite request: (now &&
1170          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1171          * therefore be shortened to just checking the flag.
1172          */
1173
1174         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1175                 return 1;
1176
1177         /*
1178          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1179          * added to the remaining conditions.
1180          */
1181
1182         if (queue_conflict(&r->res_grantqueue, lkb))
1183                 goto out;
1184
1185         /*
1186          * 6-3: By default, a conversion request is immediately granted if the
1187          * requested mode is compatible with the modes of all other granted
1188          * locks
1189          */
1190
1191         if (queue_conflict(&r->res_convertqueue, lkb))
1192                 goto out;
1193
1194         /*
1195          * 6-5: But the default algorithm for deciding whether to grant or
1196          * queue conversion requests does not by itself guarantee that such
1197          * requests are serviced on a "first come first serve" basis.  This, in
1198          * turn, can lead to a phenomenon known as "indefinate postponement".
1199          *
1200          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1201          * the system service employed to request a lock conversion.  This flag
1202          * forces certain conversion requests to be queued, even if they are
1203          * compatible with the granted modes of other locks on the same
1204          * resource.  Thus, the use of this flag results in conversion requests
1205          * being ordered on a "first come first servce" basis.
1206          *
1207          * DCT: This condition is all about new conversions being able to occur
1208          * "in place" while the lock remains on the granted queue (assuming
1209          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1210          * doesn't _have_ to go onto the convert queue where it's processed in
1211          * order.  The "now" variable is necessary to distinguish converts
1212          * being received and processed for the first time now, because once a
1213          * convert is moved to the conversion queue the condition below applies
1214          * requiring fifo granting.
1215          */
1216
1217         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1218                 return 1;
1219
1220         /*
1221          * The NOORDER flag is set to avoid the standard vms rules on grant
1222          * order.
1223          */
1224
1225         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1226                 return 1;
1227
1228         /*
1229          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1230          * granted until all other conversion requests ahead of it are granted
1231          * and/or canceled.
1232          */
1233
1234         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1235                 return 1;
1236
1237         /*
1238          * 6-4: By default, a new request is immediately granted only if all
1239          * three of the following conditions are satisfied when the request is
1240          * issued:
1241          * - The queue of ungranted conversion requests for the resource is
1242          *   empty.
1243          * - The queue of ungranted new requests for the resource is empty.
1244          * - The mode of the new request is compatible with the most
1245          *   restrictive mode of all granted locks on the resource.
1246          */
1247
1248         if (now && !conv && list_empty(&r->res_convertqueue) &&
1249             list_empty(&r->res_waitqueue))
1250                 return 1;
1251
1252         /*
1253          * 6-4: Once a lock request is in the queue of ungranted new requests,
1254          * it cannot be granted until the queue of ungranted conversion
1255          * requests is empty, all ungranted new requests ahead of it are
1256          * granted and/or canceled, and it is compatible with the granted mode
1257          * of the most restrictive lock granted on the resource.
1258          */
1259
1260         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1261             first_in_list(lkb, &r->res_waitqueue))
1262                 return 1;
1263
1264  out:
1265         /*
1266          * The following, enabled by CONVDEADLK, departs from VMS.
1267          */
1268
1269         if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1270             conversion_deadlock_detect(r, lkb)) {
1271                 lkb->lkb_grmode = DLM_LOCK_NL;
1272                 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1273         }
1274
1275         return 0;
1276 }
1277
1278 /*
1279  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1280  * simple way to provide a big optimization to applications that can use them.
1281  */
1282
1283 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1284 {
1285         uint32_t flags = lkb->lkb_exflags;
1286         int rv;
1287         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1288
1289         rv = _can_be_granted(r, lkb, now);
1290         if (rv)
1291                 goto out;
1292
1293         if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1294                 goto out;
1295
1296         if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1297                 alt = DLM_LOCK_PR;
1298         else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1299                 alt = DLM_LOCK_CW;
1300
1301         if (alt) {
1302                 lkb->lkb_rqmode = alt;
1303                 rv = _can_be_granted(r, lkb, now);
1304                 if (rv)
1305                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1306                 else
1307                         lkb->lkb_rqmode = rqmode;
1308         }
1309  out:
1310         return rv;
1311 }
1312
1313 static int grant_pending_convert(struct dlm_rsb *r, int high)
1314 {
1315         struct dlm_lkb *lkb, *s;
1316         int hi, demoted, quit, grant_restart, demote_restart;
1317
1318         quit = 0;
1319  restart:
1320         grant_restart = 0;
1321         demote_restart = 0;
1322         hi = DLM_LOCK_IV;
1323
1324         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1325                 demoted = is_demoted(lkb);
1326                 if (can_be_granted(r, lkb, 0)) {
1327                         grant_lock_pending(r, lkb);
1328                         grant_restart = 1;
1329                 } else {
1330                         hi = max_t(int, lkb->lkb_rqmode, hi);
1331                         if (!demoted && is_demoted(lkb))
1332                                 demote_restart = 1;
1333                 }
1334         }
1335
1336         if (grant_restart)
1337                 goto restart;
1338         if (demote_restart && !quit) {
1339                 quit = 1;
1340                 goto restart;
1341         }
1342
1343         return max_t(int, high, hi);
1344 }
1345
1346 static int grant_pending_wait(struct dlm_rsb *r, int high)
1347 {
1348         struct dlm_lkb *lkb, *s;
1349
1350         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1351                 if (can_be_granted(r, lkb, 0))
1352                         grant_lock_pending(r, lkb);
1353                 else
1354                         high = max_t(int, lkb->lkb_rqmode, high);
1355         }
1356
1357         return high;
1358 }
1359
1360 static void grant_pending_locks(struct dlm_rsb *r)
1361 {
1362         struct dlm_lkb *lkb, *s;
1363         int high = DLM_LOCK_IV;
1364
1365         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1366
1367         high = grant_pending_convert(r, high);
1368         high = grant_pending_wait(r, high);
1369
1370         if (high == DLM_LOCK_IV)
1371                 return;
1372
1373         /*
1374          * If there are locks left on the wait/convert queue then send blocking
1375          * ASTs to granted locks based on the largest requested mode (high)
1376          * found above. FIXME: highbast < high comparison not valid for PR/CW.
1377          */
1378
1379         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1380                 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1381                     !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1382                         queue_bast(r, lkb, high);
1383                         lkb->lkb_highbast = high;
1384                 }
1385         }
1386 }
1387
1388 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1389                             struct dlm_lkb *lkb)
1390 {
1391         struct dlm_lkb *gr;
1392
1393         list_for_each_entry(gr, head, lkb_statequeue) {
1394                 if (gr->lkb_bastaddr &&
1395                     gr->lkb_highbast < lkb->lkb_rqmode &&
1396                     !modes_compat(gr, lkb)) {
1397                         queue_bast(r, gr, lkb->lkb_rqmode);
1398                         gr->lkb_highbast = lkb->lkb_rqmode;
1399                 }
1400         }
1401 }
1402
1403 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1404 {
1405         send_bast_queue(r, &r->res_grantqueue, lkb);
1406 }
1407
1408 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1409 {
1410         send_bast_queue(r, &r->res_grantqueue, lkb);
1411         send_bast_queue(r, &r->res_convertqueue, lkb);
1412 }
1413
1414 /* set_master(r, lkb) -- set the master nodeid of a resource
1415
1416    The purpose of this function is to set the nodeid field in the given
1417    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1418    known, it can just be copied to the lkb and the function will return
1419    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1420    before it can be copied to the lkb.
1421
1422    When the rsb nodeid is being looked up remotely, the initial lkb
1423    causing the lookup is kept on the ls_waiters list waiting for the
1424    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1425    on the rsb's res_lookup list until the master is verified.
1426
1427    Return values:
1428    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1429    1: the rsb master is not available and the lkb has been placed on
1430       a wait queue
1431 */
1432
1433 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1434 {
1435         struct dlm_ls *ls = r->res_ls;
1436         int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1437
1438         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1439                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1440                 r->res_first_lkid = lkb->lkb_id;
1441                 lkb->lkb_nodeid = r->res_nodeid;
1442                 return 0;
1443         }
1444
1445         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1446                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1447                 return 1;
1448         }
1449
1450         if (r->res_nodeid == 0) {
1451                 lkb->lkb_nodeid = 0;
1452                 return 0;
1453         }
1454
1455         if (r->res_nodeid > 0) {
1456                 lkb->lkb_nodeid = r->res_nodeid;
1457                 return 0;
1458         }
1459
1460         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1461
1462         dir_nodeid = dlm_dir_nodeid(r);
1463
1464         if (dir_nodeid != our_nodeid) {
1465                 r->res_first_lkid = lkb->lkb_id;
1466                 send_lookup(r, lkb);
1467                 return 1;
1468         }
1469
1470         for (;;) {
1471                 /* It's possible for dlm_scand to remove an old rsb for
1472                    this same resource from the toss list, us to create
1473                    a new one, look up the master locally, and find it
1474                    already exists just before dlm_scand does the
1475                    dir_remove() on the previous rsb. */
1476
1477                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1478                                        r->res_length, &ret_nodeid);
1479                 if (!error)
1480                         break;
1481                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1482                 schedule();
1483         }
1484
1485         if (ret_nodeid == our_nodeid) {
1486                 r->res_first_lkid = 0;
1487                 r->res_nodeid = 0;
1488                 lkb->lkb_nodeid = 0;
1489         } else {
1490                 r->res_first_lkid = lkb->lkb_id;
1491                 r->res_nodeid = ret_nodeid;
1492                 lkb->lkb_nodeid = ret_nodeid;
1493         }
1494         return 0;
1495 }
1496
1497 static void process_lookup_list(struct dlm_rsb *r)
1498 {
1499         struct dlm_lkb *lkb, *safe;
1500
1501         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1502                 list_del(&lkb->lkb_rsb_lookup);
1503                 _request_lock(r, lkb);
1504                 schedule();
1505         }
1506 }
1507
1508 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1509
1510 static void confirm_master(struct dlm_rsb *r, int error)
1511 {
1512         struct dlm_lkb *lkb;
1513
1514         if (!r->res_first_lkid)
1515                 return;
1516
1517         switch (error) {
1518         case 0:
1519         case -EINPROGRESS:
1520                 r->res_first_lkid = 0;
1521                 process_lookup_list(r);
1522                 break;
1523
1524         case -EAGAIN:
1525                 /* the remote master didn't queue our NOQUEUE request;
1526                    make a waiting lkb the first_lkid */
1527
1528                 r->res_first_lkid = 0;
1529
1530                 if (!list_empty(&r->res_lookup)) {
1531                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1532                                          lkb_rsb_lookup);
1533                         list_del(&lkb->lkb_rsb_lookup);
1534                         r->res_first_lkid = lkb->lkb_id;
1535                         _request_lock(r, lkb);
1536                 } else
1537                         r->res_nodeid = -1;
1538                 break;
1539
1540         default:
1541                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1542         }
1543 }
1544
1545 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1546                          int namelen, uint32_t parent_lkid, void *ast,
1547                          void *astarg, void *bast, struct dlm_args *args)
1548 {
1549         int rv = -EINVAL;
1550
1551         /* check for invalid arg usage */
1552
1553         if (mode < 0 || mode > DLM_LOCK_EX)
1554                 goto out;
1555
1556         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1557                 goto out;
1558
1559         if (flags & DLM_LKF_CANCEL)
1560                 goto out;
1561
1562         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1563                 goto out;
1564
1565         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1566                 goto out;
1567
1568         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1569                 goto out;
1570
1571         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1572                 goto out;
1573
1574         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1575                 goto out;
1576
1577         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1578                 goto out;
1579
1580         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1581                 goto out;
1582
1583         if (!ast || !lksb)
1584                 goto out;
1585
1586         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1587                 goto out;
1588
1589         /* parent/child locks not yet supported */
1590         if (parent_lkid)
1591                 goto out;
1592
1593         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1594                 goto out;
1595
1596         /* these args will be copied to the lkb in validate_lock_args,
1597            it cannot be done now because when converting locks, fields in
1598            an active lkb cannot be modified before locking the rsb */
1599
1600         args->flags = flags;
1601         args->astaddr = ast;
1602         args->astparam = (long) astarg;
1603         args->bastaddr = bast;
1604         args->mode = mode;
1605         args->lksb = lksb;
1606         rv = 0;
1607  out:
1608         return rv;
1609 }
1610
1611 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1612 {
1613         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1614                       DLM_LKF_FORCEUNLOCK))
1615                 return -EINVAL;
1616
1617         args->flags = flags;
1618         args->astparam = (long) astarg;
1619         return 0;
1620 }
1621
1622 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1623                               struct dlm_args *args)
1624 {
1625         int rv = -EINVAL;
1626
1627         if (args->flags & DLM_LKF_CONVERT) {
1628                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1629                         goto out;
1630
1631                 if (args->flags & DLM_LKF_QUECVT &&
1632                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1633                         goto out;
1634
1635                 rv = -EBUSY;
1636                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1637                         goto out;
1638
1639                 if (lkb->lkb_wait_type)
1640                         goto out;
1641         }
1642
1643         lkb->lkb_exflags = args->flags;
1644         lkb->lkb_sbflags = 0;
1645         lkb->lkb_astaddr = args->astaddr;
1646         lkb->lkb_astparam = args->astparam;
1647         lkb->lkb_bastaddr = args->bastaddr;
1648         lkb->lkb_rqmode = args->mode;
1649         lkb->lkb_lksb = args->lksb;
1650         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1651         lkb->lkb_ownpid = (int) current->pid;
1652         rv = 0;
1653  out:
1654         return rv;
1655 }
1656
1657 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1658 {
1659         int rv = -EINVAL;
1660
1661         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1662                 goto out;
1663
1664         if (args->flags & DLM_LKF_FORCEUNLOCK)
1665                 goto out_ok;
1666
1667         if (args->flags & DLM_LKF_CANCEL &&
1668             lkb->lkb_status == DLM_LKSTS_GRANTED)
1669                 goto out;
1670
1671         if (!(args->flags & DLM_LKF_CANCEL) &&
1672             lkb->lkb_status != DLM_LKSTS_GRANTED)
1673                 goto out;
1674
1675         rv = -EBUSY;
1676         if (lkb->lkb_wait_type)
1677                 goto out;
1678
1679  out_ok:
1680         lkb->lkb_exflags = args->flags;
1681         lkb->lkb_sbflags = 0;
1682         lkb->lkb_astparam = args->astparam;
1683
1684         rv = 0;
1685  out:
1686         return rv;
1687 }
1688
1689 /*
1690  * Four stage 4 varieties:
1691  * do_request(), do_convert(), do_unlock(), do_cancel()
1692  * These are called on the master node for the given lock and
1693  * from the central locking logic.
1694  */
1695
1696 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1697 {
1698         int error = 0;
1699
1700         if (can_be_granted(r, lkb, 1)) {
1701                 grant_lock(r, lkb);
1702                 queue_cast(r, lkb, 0);
1703                 goto out;
1704         }
1705
1706         if (can_be_queued(lkb)) {
1707                 error = -EINPROGRESS;
1708                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1709                 send_blocking_asts(r, lkb);
1710                 goto out;
1711         }
1712
1713         error = -EAGAIN;
1714         if (force_blocking_asts(lkb))
1715                 send_blocking_asts_all(r, lkb);
1716         queue_cast(r, lkb, -EAGAIN);
1717
1718  out:
1719         return error;
1720 }
1721
1722 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1723 {
1724         int error = 0;
1725
1726         /* changing an existing lock may allow others to be granted */
1727
1728         if (can_be_granted(r, lkb, 1)) {
1729                 grant_lock(r, lkb);
1730                 queue_cast(r, lkb, 0);
1731                 grant_pending_locks(r);
1732                 goto out;
1733         }
1734
1735         if (can_be_queued(lkb)) {
1736                 if (is_demoted(lkb))
1737                         grant_pending_locks(r);
1738                 error = -EINPROGRESS;
1739                 del_lkb(r, lkb);
1740                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1741                 send_blocking_asts(r, lkb);
1742                 goto out;
1743         }
1744
1745         error = -EAGAIN;
1746         if (force_blocking_asts(lkb))
1747                 send_blocking_asts_all(r, lkb);
1748         queue_cast(r, lkb, -EAGAIN);
1749
1750  out:
1751         return error;
1752 }
1753
1754 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1755 {
1756         remove_lock(r, lkb);
1757         queue_cast(r, lkb, -DLM_EUNLOCK);
1758         grant_pending_locks(r);
1759         return -DLM_EUNLOCK;
1760 }
1761
1762 /* FIXME: if revert_lock() finds that the lkb is granted, we should
1763    skip the queue_cast(ECANCEL).  It indicates that the request/convert
1764    completed (and queued a normal ast) just before the cancel; we don't
1765    want to clobber the sb_result for the normal ast with ECANCEL. */
1766  
1767 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1768 {
1769         revert_lock(r, lkb);
1770         queue_cast(r, lkb, -DLM_ECANCEL);
1771         grant_pending_locks(r);
1772         return -DLM_ECANCEL;
1773 }
1774
1775 /*
1776  * Four stage 3 varieties:
1777  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1778  */
1779
1780 /* add a new lkb to a possibly new rsb, called by requesting process */
1781
1782 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1783 {
1784         int error;
1785
1786         /* set_master: sets lkb nodeid from r */
1787
1788         error = set_master(r, lkb);
1789         if (error < 0)
1790                 goto out;
1791         if (error) {
1792                 error = 0;
1793                 goto out;
1794         }
1795
1796         if (is_remote(r))
1797                 /* receive_request() calls do_request() on remote node */
1798                 error = send_request(r, lkb);
1799         else
1800                 error = do_request(r, lkb);
1801  out:
1802         return error;
1803 }
1804
1805 /* change some property of an existing lkb, e.g. mode */
1806
1807 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1808 {
1809         int error;
1810
1811         if (is_remote(r))
1812                 /* receive_convert() calls do_convert() on remote node */
1813                 error = send_convert(r, lkb);
1814         else
1815                 error = do_convert(r, lkb);
1816
1817         return error;
1818 }
1819
1820 /* remove an existing lkb from the granted queue */
1821
1822 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1823 {
1824         int error;
1825
1826         if (is_remote(r))
1827                 /* receive_unlock() calls do_unlock() on remote node */
1828                 error = send_unlock(r, lkb);
1829         else
1830                 error = do_unlock(r, lkb);
1831
1832         return error;
1833 }
1834
1835 /* remove an existing lkb from the convert or wait queue */
1836
1837 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1838 {
1839         int error;
1840
1841         if (is_remote(r))
1842                 /* receive_cancel() calls do_cancel() on remote node */
1843                 error = send_cancel(r, lkb);
1844         else
1845                 error = do_cancel(r, lkb);
1846
1847         return error;
1848 }
1849
1850 /*
1851  * Four stage 2 varieties:
1852  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1853  */
1854
1855 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1856                         int len, struct dlm_args *args)
1857 {
1858         struct dlm_rsb *r;
1859         int error;
1860
1861         error = validate_lock_args(ls, lkb, args);
1862         if (error)
1863                 goto out;
1864
1865         error = find_rsb(ls, name, len, R_CREATE, &r);
1866         if (error)
1867                 goto out;
1868
1869         lock_rsb(r);
1870
1871         attach_lkb(r, lkb);
1872         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1873
1874         error = _request_lock(r, lkb);
1875
1876         unlock_rsb(r);
1877         put_rsb(r);
1878
1879  out:
1880         return error;
1881 }
1882
1883 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1884                         struct dlm_args *args)
1885 {
1886         struct dlm_rsb *r;
1887         int error;
1888
1889         r = lkb->lkb_resource;
1890
1891         hold_rsb(r);
1892         lock_rsb(r);
1893
1894         error = validate_lock_args(ls, lkb, args);
1895         if (error)
1896                 goto out;
1897
1898         error = _convert_lock(r, lkb);
1899  out:
1900         unlock_rsb(r);
1901         put_rsb(r);
1902         return error;
1903 }
1904
1905 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1906                        struct dlm_args *args)
1907 {
1908         struct dlm_rsb *r;
1909         int error;
1910
1911         r = lkb->lkb_resource;
1912
1913         hold_rsb(r);
1914         lock_rsb(r);
1915
1916         error = validate_unlock_args(lkb, args);
1917         if (error)
1918                 goto out;
1919
1920         error = _unlock_lock(r, lkb);
1921  out:
1922         unlock_rsb(r);
1923         put_rsb(r);
1924         return error;
1925 }
1926
1927 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1928                        struct dlm_args *args)
1929 {
1930         struct dlm_rsb *r;
1931         int error;
1932
1933         r = lkb->lkb_resource;
1934
1935         hold_rsb(r);
1936         lock_rsb(r);
1937
1938         error = validate_unlock_args(lkb, args);
1939         if (error)
1940                 goto out;
1941
1942         error = _cancel_lock(r, lkb);
1943  out:
1944         unlock_rsb(r);
1945         put_rsb(r);
1946         return error;
1947 }
1948
1949 /*
1950  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
1951  */
1952
1953 int dlm_lock(dlm_lockspace_t *lockspace,
1954              int mode,
1955              struct dlm_lksb *lksb,
1956              uint32_t flags,
1957              void *name,
1958              unsigned int namelen,
1959              uint32_t parent_lkid,
1960              void (*ast) (void *astarg),
1961              void *astarg,
1962              void (*bast) (void *astarg, int mode))
1963 {
1964         struct dlm_ls *ls;
1965         struct dlm_lkb *lkb;
1966         struct dlm_args args;
1967         int error, convert = flags & DLM_LKF_CONVERT;
1968
1969         ls = dlm_find_lockspace_local(lockspace);
1970         if (!ls)
1971                 return -EINVAL;
1972
1973         lock_recovery(ls);
1974
1975         if (convert)
1976                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
1977         else
1978                 error = create_lkb(ls, &lkb);
1979
1980         if (error)
1981                 goto out;
1982
1983         error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1984                               astarg, bast, &args);
1985         if (error)
1986                 goto out_put;
1987
1988         if (convert)
1989                 error = convert_lock(ls, lkb, &args);
1990         else
1991                 error = request_lock(ls, lkb, name, namelen, &args);
1992
1993         if (error == -EINPROGRESS)
1994                 error = 0;
1995  out_put:
1996         if (convert || error)
1997                 __put_lkb(ls, lkb);
1998         if (error == -EAGAIN)
1999                 error = 0;
2000  out:
2001         unlock_recovery(ls);
2002         dlm_put_lockspace(ls);
2003         return error;
2004 }
2005
2006 int dlm_unlock(dlm_lockspace_t *lockspace,
2007                uint32_t lkid,
2008                uint32_t flags,
2009                struct dlm_lksb *lksb,
2010                void *astarg)
2011 {
2012         struct dlm_ls *ls;
2013         struct dlm_lkb *lkb;
2014         struct dlm_args args;
2015         int error;
2016
2017         ls = dlm_find_lockspace_local(lockspace);
2018         if (!ls)
2019                 return -EINVAL;
2020
2021         lock_recovery(ls);
2022
2023         error = find_lkb(ls, lkid, &lkb);
2024         if (error)
2025                 goto out;
2026
2027         error = set_unlock_args(flags, astarg, &args);
2028         if (error)
2029                 goto out_put;
2030
2031         if (flags & DLM_LKF_CANCEL)
2032                 error = cancel_lock(ls, lkb, &args);
2033         else
2034                 error = unlock_lock(ls, lkb, &args);
2035
2036         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2037                 error = 0;
2038  out_put:
2039         dlm_put_lkb(lkb);
2040  out:
2041         unlock_recovery(ls);
2042         dlm_put_lockspace(ls);
2043         return error;
2044 }
2045
2046 /*
2047  * send/receive routines for remote operations and replies
2048  *
2049  * send_args
2050  * send_common
2051  * send_request                 receive_request
2052  * send_convert                 receive_convert
2053  * send_unlock                  receive_unlock
2054  * send_cancel                  receive_cancel
2055  * send_grant                   receive_grant
2056  * send_bast                    receive_bast
2057  * send_lookup                  receive_lookup
2058  * send_remove                  receive_remove
2059  *
2060  *                              send_common_reply
2061  * receive_request_reply        send_request_reply
2062  * receive_convert_reply        send_convert_reply
2063  * receive_unlock_reply         send_unlock_reply
2064  * receive_cancel_reply         send_cancel_reply
2065  * receive_lookup_reply         send_lookup_reply
2066  */
2067
2068 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2069                           int to_nodeid, int mstype,
2070                           struct dlm_message **ms_ret,
2071                           struct dlm_mhandle **mh_ret)
2072 {
2073         struct dlm_message *ms;
2074         struct dlm_mhandle *mh;
2075         char *mb;
2076         int mb_len = sizeof(struct dlm_message);
2077
2078         switch (mstype) {
2079         case DLM_MSG_REQUEST:
2080         case DLM_MSG_LOOKUP:
2081         case DLM_MSG_REMOVE:
2082                 mb_len += r->res_length;
2083                 break;
2084         case DLM_MSG_CONVERT:
2085         case DLM_MSG_UNLOCK:
2086         case DLM_MSG_REQUEST_REPLY:
2087         case DLM_MSG_CONVERT_REPLY:
2088         case DLM_MSG_GRANT:
2089                 if (lkb && lkb->lkb_lvbptr)
2090                         mb_len += r->res_ls->ls_lvblen;
2091                 break;
2092         }
2093
2094         /* get_buffer gives us a message handle (mh) that we need to
2095            pass into lowcomms_commit and a message buffer (mb) that we
2096            write our data into */
2097
2098         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2099         if (!mh)
2100                 return -ENOBUFS;
2101
2102         memset(mb, 0, mb_len);
2103
2104         ms = (struct dlm_message *) mb;
2105
2106         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2107         ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2108         ms->m_header.h_nodeid = dlm_our_nodeid();
2109         ms->m_header.h_length = mb_len;
2110         ms->m_header.h_cmd = DLM_MSG;
2111
2112         ms->m_type = mstype;
2113
2114         *mh_ret = mh;
2115         *ms_ret = ms;
2116         return 0;
2117 }
2118
2119 /* further lowcomms enhancements or alternate implementations may make
2120    the return value from this function useful at some point */
2121
2122 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2123 {
2124         dlm_message_out(ms);
2125         dlm_lowcomms_commit_buffer(mh);
2126         return 0;
2127 }
2128
2129 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2130                       struct dlm_message *ms)
2131 {
2132         ms->m_nodeid   = lkb->lkb_nodeid;
2133         ms->m_pid      = lkb->lkb_ownpid;
2134         ms->m_lkid     = lkb->lkb_id;
2135         ms->m_remid    = lkb->lkb_remid;
2136         ms->m_exflags  = lkb->lkb_exflags;
2137         ms->m_sbflags  = lkb->lkb_sbflags;
2138         ms->m_flags    = lkb->lkb_flags;
2139         ms->m_lvbseq   = lkb->lkb_lvbseq;
2140         ms->m_status   = lkb->lkb_status;
2141         ms->m_grmode   = lkb->lkb_grmode;
2142         ms->m_rqmode   = lkb->lkb_rqmode;
2143         ms->m_hash     = r->res_hash;
2144
2145         /* m_result and m_bastmode are set from function args,
2146            not from lkb fields */
2147
2148         if (lkb->lkb_bastaddr)
2149                 ms->m_asts |= AST_BAST;
2150         if (lkb->lkb_astaddr)
2151                 ms->m_asts |= AST_COMP;
2152
2153         /* compare with switch in create_message; send_remove() doesn't
2154            use send_args() */
2155
2156         switch (ms->m_type) {
2157         case DLM_MSG_REQUEST:
2158         case DLM_MSG_LOOKUP:
2159                 memcpy(ms->m_extra, r->res_name, r->res_length);
2160                 break;
2161         case DLM_MSG_CONVERT:
2162         case DLM_MSG_UNLOCK:
2163         case DLM_MSG_REQUEST_REPLY:
2164         case DLM_MSG_CONVERT_REPLY:
2165         case DLM_MSG_GRANT:
2166                 if (!lkb->lkb_lvbptr)
2167                         break;
2168                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2169                 break;
2170         }
2171 }
2172
2173 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2174 {
2175         struct dlm_message *ms;
2176         struct dlm_mhandle *mh;
2177         int to_nodeid, error;
2178
2179         add_to_waiters(lkb, mstype);
2180
2181         to_nodeid = r->res_nodeid;
2182
2183         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2184         if (error)
2185                 goto fail;
2186
2187         send_args(r, lkb, ms);
2188
2189         error = send_message(mh, ms);
2190         if (error)
2191                 goto fail;
2192         return 0;
2193
2194  fail:
2195         remove_from_waiters(lkb);
2196         return error;
2197 }
2198
2199 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2200 {
2201         return send_common(r, lkb, DLM_MSG_REQUEST);
2202 }
2203
2204 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2205 {
2206         int error;
2207
2208         error = send_common(r, lkb, DLM_MSG_CONVERT);
2209
2210         /* down conversions go without a reply from the master */
2211         if (!error && down_conversion(lkb)) {
2212                 remove_from_waiters(lkb);
2213                 r->res_ls->ls_stub_ms.m_result = 0;
2214                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2215                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2216         }
2217
2218         return error;
2219 }
2220
2221 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2222    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2223    that the master is still correct. */
2224
2225 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2226 {
2227         return send_common(r, lkb, DLM_MSG_UNLOCK);
2228 }
2229
2230 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2231 {
2232         return send_common(r, lkb, DLM_MSG_CANCEL);
2233 }
2234
2235 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2236 {
2237         struct dlm_message *ms;
2238         struct dlm_mhandle *mh;
2239         int to_nodeid, error;
2240
2241         to_nodeid = lkb->lkb_nodeid;
2242
2243         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2244         if (error)
2245                 goto out;
2246
2247         send_args(r, lkb, ms);
2248
2249         ms->m_result = 0;
2250
2251         error = send_message(mh, ms);
2252  out:
2253         return error;
2254 }
2255
2256 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2257 {
2258         struct dlm_message *ms;
2259         struct dlm_mhandle *mh;
2260         int to_nodeid, error;
2261
2262         to_nodeid = lkb->lkb_nodeid;
2263
2264         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2265         if (error)
2266                 goto out;
2267
2268         send_args(r, lkb, ms);
2269
2270         ms->m_bastmode = mode;
2271
2272         error = send_message(mh, ms);
2273  out:
2274         return error;
2275 }
2276
2277 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2278 {
2279         struct dlm_message *ms;
2280         struct dlm_mhandle *mh;
2281         int to_nodeid, error;
2282
2283         add_to_waiters(lkb, DLM_MSG_LOOKUP);
2284
2285         to_nodeid = dlm_dir_nodeid(r);
2286
2287         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2288         if (error)
2289                 goto fail;
2290
2291         send_args(r, lkb, ms);
2292
2293         error = send_message(mh, ms);
2294         if (error)
2295                 goto fail;
2296         return 0;
2297
2298  fail:
2299         remove_from_waiters(lkb);
2300         return error;
2301 }
2302
2303 static int send_remove(struct dlm_rsb *r)
2304 {
2305         struct dlm_message *ms;
2306         struct dlm_mhandle *mh;
2307         int to_nodeid, error;
2308
2309         to_nodeid = dlm_dir_nodeid(r);
2310
2311         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2312         if (error)
2313                 goto out;
2314
2315         memcpy(ms->m_extra, r->res_name, r->res_length);
2316         ms->m_hash = r->res_hash;
2317
2318         error = send_message(mh, ms);
2319  out:
2320         return error;
2321 }
2322
2323 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2324                              int mstype, int rv)
2325 {
2326         struct dlm_message *ms;
2327         struct dlm_mhandle *mh;
2328         int to_nodeid, error;
2329
2330         to_nodeid = lkb->lkb_nodeid;
2331
2332         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2333         if (error)
2334                 goto out;
2335
2336         send_args(r, lkb, ms);
2337
2338         ms->m_result = rv;
2339
2340         error = send_message(mh, ms);
2341  out:
2342         return error;
2343 }
2344
2345 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2346 {
2347         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2348 }
2349
2350 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2351 {
2352         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2353 }
2354
2355 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2356 {
2357         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2358 }
2359
2360 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2361 {
2362         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2363 }
2364
2365 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2366                              int ret_nodeid, int rv)
2367 {
2368         struct dlm_rsb *r = &ls->ls_stub_rsb;
2369         struct dlm_message *ms;
2370         struct dlm_mhandle *mh;
2371         int error, nodeid = ms_in->m_header.h_nodeid;
2372
2373         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2374         if (error)
2375                 goto out;
2376
2377         ms->m_lkid = ms_in->m_lkid;
2378         ms->m_result = rv;
2379         ms->m_nodeid = ret_nodeid;
2380
2381         error = send_message(mh, ms);
2382  out:
2383         return error;
2384 }
2385
2386 /* which args we save from a received message depends heavily on the type
2387    of message, unlike the send side where we can safely send everything about
2388    the lkb for any type of message */
2389
2390 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2391 {
2392         lkb->lkb_exflags = ms->m_exflags;
2393         lkb->lkb_sbflags = ms->m_sbflags;
2394         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2395                          (ms->m_flags & 0x0000FFFF);
2396 }
2397
2398 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2399 {
2400         lkb->lkb_sbflags = ms->m_sbflags;
2401         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2402                          (ms->m_flags & 0x0000FFFF);
2403 }
2404
2405 static int receive_extralen(struct dlm_message *ms)
2406 {
2407         return (ms->m_header.h_length - sizeof(struct dlm_message));
2408 }
2409
2410 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2411                        struct dlm_message *ms)
2412 {
2413         int len;
2414
2415         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2416                 if (!lkb->lkb_lvbptr)
2417                         lkb->lkb_lvbptr = allocate_lvb(ls);
2418                 if (!lkb->lkb_lvbptr)
2419                         return -ENOMEM;
2420                 len = receive_extralen(ms);
2421                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2422         }
2423         return 0;
2424 }
2425
2426 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2427                                 struct dlm_message *ms)
2428 {
2429         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2430         lkb->lkb_ownpid = ms->m_pid;
2431         lkb->lkb_remid = ms->m_lkid;
2432         lkb->lkb_grmode = DLM_LOCK_IV;
2433         lkb->lkb_rqmode = ms->m_rqmode;
2434         lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2435         lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2436
2437         DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2438
2439         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2440                 /* lkb was just created so there won't be an lvb yet */
2441                 lkb->lkb_lvbptr = allocate_lvb(ls);
2442                 if (!lkb->lkb_lvbptr)
2443                         return -ENOMEM;
2444         }
2445
2446         return 0;
2447 }
2448
2449 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2450                                 struct dlm_message *ms)
2451 {
2452         if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2453                 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2454                           lkb->lkb_nodeid, ms->m_header.h_nodeid,
2455                           lkb->lkb_id, lkb->lkb_remid);
2456                 return -EINVAL;
2457         }
2458
2459         if (!is_master_copy(lkb))
2460                 return -EINVAL;
2461
2462         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2463                 return -EBUSY;
2464
2465         if (receive_lvb(ls, lkb, ms))
2466                 return -ENOMEM;
2467
2468         lkb->lkb_rqmode = ms->m_rqmode;
2469         lkb->lkb_lvbseq = ms->m_lvbseq;
2470
2471         return 0;
2472 }
2473
2474 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2475                                struct dlm_message *ms)
2476 {
2477         if (!is_master_copy(lkb))
2478                 return -EINVAL;
2479         if (receive_lvb(ls, lkb, ms))
2480                 return -ENOMEM;
2481         return 0;
2482 }
2483
2484 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2485    uses to send a reply and that the remote end uses to process the reply. */
2486
2487 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2488 {
2489         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2490         lkb->lkb_nodeid = ms->m_header.h_nodeid;
2491         lkb->lkb_remid = ms->m_lkid;
2492 }
2493
2494 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2495 {
2496         struct dlm_lkb *lkb;
2497         struct dlm_rsb *r;
2498         int error, namelen;
2499
2500         error = create_lkb(ls, &lkb);
2501         if (error)
2502                 goto fail;
2503
2504         receive_flags(lkb, ms);
2505         lkb->lkb_flags |= DLM_IFL_MSTCPY;
2506         error = receive_request_args(ls, lkb, ms);
2507         if (error) {
2508                 __put_lkb(ls, lkb);
2509                 goto fail;
2510         }
2511
2512         namelen = receive_extralen(ms);
2513
2514         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2515         if (error) {
2516                 __put_lkb(ls, lkb);
2517                 goto fail;
2518         }
2519
2520         lock_rsb(r);
2521
2522         attach_lkb(r, lkb);
2523         error = do_request(r, lkb);
2524         send_request_reply(r, lkb, error);
2525
2526         unlock_rsb(r);
2527         put_rsb(r);
2528
2529         if (error == -EINPROGRESS)
2530                 error = 0;
2531         if (error)
2532                 dlm_put_lkb(lkb);
2533         return;
2534
2535  fail:
2536         setup_stub_lkb(ls, ms);
2537         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2538 }
2539
2540 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2541 {
2542         struct dlm_lkb *lkb;
2543         struct dlm_rsb *r;
2544         int error, reply = 1;
2545
2546         error = find_lkb(ls, ms->m_remid, &lkb);
2547         if (error)
2548                 goto fail;
2549
2550         r = lkb->lkb_resource;
2551
2552         hold_rsb(r);
2553         lock_rsb(r);
2554
2555         receive_flags(lkb, ms);
2556         error = receive_convert_args(ls, lkb, ms);
2557         if (error)
2558                 goto out;
2559         reply = !down_conversion(lkb);
2560
2561         error = do_convert(r, lkb);
2562  out:
2563         if (reply)
2564                 send_convert_reply(r, lkb, error);
2565
2566         unlock_rsb(r);
2567         put_rsb(r);
2568         dlm_put_lkb(lkb);
2569         return;
2570
2571  fail:
2572         setup_stub_lkb(ls, ms);
2573         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2574 }
2575
2576 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2577 {
2578         struct dlm_lkb *lkb;
2579         struct dlm_rsb *r;
2580         int error;
2581
2582         error = find_lkb(ls, ms->m_remid, &lkb);
2583         if (error)
2584                 goto fail;
2585
2586         r = lkb->lkb_resource;
2587
2588         hold_rsb(r);
2589         lock_rsb(r);
2590
2591         receive_flags(lkb, ms);
2592         error = receive_unlock_args(ls, lkb, ms);
2593         if (error)
2594                 goto out;
2595
2596         error = do_unlock(r, lkb);
2597  out:
2598         send_unlock_reply(r, lkb, error);
2599
2600         unlock_rsb(r);
2601         put_rsb(r);
2602         dlm_put_lkb(lkb);
2603         return;
2604
2605  fail:
2606         setup_stub_lkb(ls, ms);
2607         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2608 }
2609
2610 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2611 {
2612         struct dlm_lkb *lkb;
2613         struct dlm_rsb *r;
2614         int error;
2615
2616         error = find_lkb(ls, ms->m_remid, &lkb);
2617         if (error)
2618                 goto fail;
2619
2620         receive_flags(lkb, ms);
2621
2622         r = lkb->lkb_resource;
2623
2624         hold_rsb(r);
2625         lock_rsb(r);
2626
2627         error = do_cancel(r, lkb);
2628         send_cancel_reply(r, lkb, error);
2629
2630         unlock_rsb(r);
2631         put_rsb(r);
2632         dlm_put_lkb(lkb);
2633         return;
2634
2635  fail:
2636         setup_stub_lkb(ls, ms);
2637         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2638 }
2639
2640 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2641 {
2642         struct dlm_lkb *lkb;
2643         struct dlm_rsb *r;
2644         int error;
2645
2646         error = find_lkb(ls, ms->m_remid, &lkb);
2647         if (error) {
2648                 log_error(ls, "receive_grant no lkb");
2649                 return;
2650         }
2651         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2652
2653         r = lkb->lkb_resource;
2654
2655         hold_rsb(r);
2656         lock_rsb(r);
2657
2658         receive_flags_reply(lkb, ms);
2659         grant_lock_pc(r, lkb, ms);
2660         queue_cast(r, lkb, 0);
2661
2662         unlock_rsb(r);
2663         put_rsb(r);
2664         dlm_put_lkb(lkb);
2665 }
2666
2667 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2668 {
2669         struct dlm_lkb *lkb;
2670         struct dlm_rsb *r;
2671         int error;
2672
2673         error = find_lkb(ls, ms->m_remid, &lkb);
2674         if (error) {
2675                 log_error(ls, "receive_bast no lkb");
2676                 return;
2677         }
2678         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2679
2680         r = lkb->lkb_resource;
2681
2682         hold_rsb(r);
2683         lock_rsb(r);
2684
2685         queue_bast(r, lkb, ms->m_bastmode);
2686
2687         unlock_rsb(r);
2688         put_rsb(r);
2689         dlm_put_lkb(lkb);
2690 }
2691
2692 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2693 {
2694         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2695
2696         from_nodeid = ms->m_header.h_nodeid;
2697         our_nodeid = dlm_our_nodeid();
2698
2699         len = receive_extralen(ms);
2700
2701         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2702         if (dir_nodeid != our_nodeid) {
2703                 log_error(ls, "lookup dir_nodeid %d from %d",
2704                           dir_nodeid, from_nodeid);
2705                 error = -EINVAL;
2706                 ret_nodeid = -1;
2707                 goto out;
2708         }
2709
2710         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2711
2712         /* Optimization: we're master so treat lookup as a request */
2713         if (!error && ret_nodeid == our_nodeid) {
2714                 receive_request(ls, ms);
2715                 return;
2716         }
2717  out:
2718         send_lookup_reply(ls, ms, ret_nodeid, error);
2719 }
2720
2721 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2722 {
2723         int len, dir_nodeid, from_nodeid;
2724
2725         from_nodeid = ms->m_header.h_nodeid;
2726
2727         len = receive_extralen(ms);
2728
2729         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2730         if (dir_nodeid != dlm_our_nodeid()) {
2731                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2732                           dir_nodeid, from_nodeid);
2733                 return;
2734         }
2735
2736         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2737 }
2738
2739 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2740 {
2741         struct dlm_lkb *lkb;
2742         struct dlm_rsb *r;
2743         int error, mstype;
2744
2745         error = find_lkb(ls, ms->m_remid, &lkb);
2746         if (error) {
2747                 log_error(ls, "receive_request_reply no lkb");
2748                 return;
2749         }
2750         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2751
2752         mstype = lkb->lkb_wait_type;
2753         error = remove_from_waiters(lkb);
2754         if (error) {
2755                 log_error(ls, "receive_request_reply not on waiters");
2756                 goto out;
2757         }
2758
2759         /* this is the value returned from do_request() on the master */
2760         error = ms->m_result;
2761
2762         r = lkb->lkb_resource;
2763         hold_rsb(r);
2764         lock_rsb(r);
2765
2766         /* Optimization: the dir node was also the master, so it took our
2767            lookup as a request and sent request reply instead of lookup reply */
2768         if (mstype == DLM_MSG_LOOKUP) {
2769                 r->res_nodeid = ms->m_header.h_nodeid;
2770                 lkb->lkb_nodeid = r->res_nodeid;
2771         }
2772
2773         switch (error) {
2774         case -EAGAIN:
2775                 /* request would block (be queued) on remote master;
2776                    the unhold undoes the original ref from create_lkb()
2777                    so it leads to the lkb being freed */
2778                 queue_cast(r, lkb, -EAGAIN);
2779                 confirm_master(r, -EAGAIN);
2780                 unhold_lkb(lkb);
2781                 break;
2782
2783         case -EINPROGRESS:
2784         case 0:
2785                 /* request was queued or granted on remote master */
2786                 receive_flags_reply(lkb, ms);
2787                 lkb->lkb_remid = ms->m_lkid;
2788                 if (error)
2789                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
2790                 else {
2791                         grant_lock_pc(r, lkb, ms);
2792                         queue_cast(r, lkb, 0);
2793                 }
2794                 confirm_master(r, error);
2795                 break;
2796
2797         case -EBADR:
2798         case -ENOTBLK:
2799                 /* find_rsb failed to find rsb or rsb wasn't master */
2800                 r->res_nodeid = -1;
2801                 lkb->lkb_nodeid = -1;
2802                 _request_lock(r, lkb);
2803                 break;
2804
2805         default:
2806                 log_error(ls, "receive_request_reply error %d", error);
2807         }
2808
2809         unlock_rsb(r);
2810         put_rsb(r);
2811  out:
2812         dlm_put_lkb(lkb);
2813 }
2814
2815 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2816                                     struct dlm_message *ms)
2817 {
2818         int error = ms->m_result;
2819
2820         /* this is the value returned from do_convert() on the master */
2821
2822         switch (error) {
2823         case -EAGAIN:
2824                 /* convert would block (be queued) on remote master */
2825                 queue_cast(r, lkb, -EAGAIN);
2826                 break;
2827
2828         case -EINPROGRESS:
2829                 /* convert was queued on remote master */
2830                 del_lkb(r, lkb);
2831                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2832                 break;
2833
2834         case 0:
2835                 /* convert was granted on remote master */
2836                 receive_flags_reply(lkb, ms);
2837                 grant_lock_pc(r, lkb, ms);
2838                 queue_cast(r, lkb, 0);
2839                 break;
2840
2841         default:
2842                 log_error(r->res_ls, "receive_convert_reply error %d", error);
2843         }
2844 }
2845
2846 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2847 {
2848         struct dlm_rsb *r = lkb->lkb_resource;
2849
2850         hold_rsb(r);
2851         lock_rsb(r);
2852
2853         __receive_convert_reply(r, lkb, ms);
2854
2855         unlock_rsb(r);
2856         put_rsb(r);
2857 }
2858
2859 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2860 {
2861         struct dlm_lkb *lkb;
2862         int error;
2863
2864         error = find_lkb(ls, ms->m_remid, &lkb);
2865         if (error) {
2866                 log_error(ls, "receive_convert_reply no lkb");
2867                 return;
2868         }
2869         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2870
2871         error = remove_from_waiters(lkb);
2872         if (error) {
2873                 log_error(ls, "receive_convert_reply not on waiters");
2874                 goto out;
2875         }
2876
2877         _receive_convert_reply(lkb, ms);
2878  out:
2879         dlm_put_lkb(lkb);
2880 }
2881
2882 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2883 {
2884         struct dlm_rsb *r = lkb->lkb_resource;
2885         int error = ms->m_result;
2886
2887         hold_rsb(r);
2888         lock_rsb(r);
2889
2890         /* this is the value returned from do_unlock() on the master */
2891
2892         switch (error) {
2893         case -DLM_EUNLOCK:
2894                 receive_flags_reply(lkb, ms);
2895                 remove_lock_pc(r, lkb);
2896                 queue_cast(r, lkb, -DLM_EUNLOCK);
2897                 break;
2898         default:
2899                 log_error(r->res_ls, "receive_unlock_reply error %d", error);
2900         }
2901
2902         unlock_rsb(r);
2903         put_rsb(r);
2904 }
2905
2906 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2907 {
2908         struct dlm_lkb *lkb;
2909         int error;
2910
2911         error = find_lkb(ls, ms->m_remid, &lkb);
2912         if (error) {
2913                 log_error(ls, "receive_unlock_reply no lkb");
2914                 return;
2915         }
2916         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2917
2918         error = remove_from_waiters(lkb);
2919         if (error) {
2920                 log_error(ls, "receive_unlock_reply not on waiters");
2921                 goto out;
2922         }
2923
2924         _receive_unlock_reply(lkb, ms);
2925  out:
2926         dlm_put_lkb(lkb);
2927 }
2928
2929 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2930 {
2931         struct dlm_rsb *r = lkb->lkb_resource;
2932         int error = ms->m_result;
2933
2934         hold_rsb(r);
2935         lock_rsb(r);
2936
2937         /* this is the value returned from do_cancel() on the master */
2938
2939         switch (error) {
2940         case -DLM_ECANCEL:
2941                 receive_flags_reply(lkb, ms);
2942                 revert_lock_pc(r, lkb);
2943                 queue_cast(r, lkb, -DLM_ECANCEL);
2944                 break;
2945         default:
2946                 log_error(r->res_ls, "receive_cancel_reply error %d", error);
2947         }
2948
2949         unlock_rsb(r);
2950         put_rsb(r);
2951 }
2952
2953 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2954 {
2955         struct dlm_lkb *lkb;
2956         int error;
2957
2958         error = find_lkb(ls, ms->m_remid, &lkb);
2959         if (error) {
2960                 log_error(ls, "receive_cancel_reply no lkb");
2961                 return;
2962         }
2963         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2964
2965         error = remove_from_waiters(lkb);
2966         if (error) {
2967                 log_error(ls, "receive_cancel_reply not on waiters");
2968                 goto out;
2969         }
2970
2971         _receive_cancel_reply(lkb, ms);
2972  out:
2973         dlm_put_lkb(lkb);
2974 }
2975
2976 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2977 {
2978         struct dlm_lkb *lkb;
2979         struct dlm_rsb *r;
2980         int error, ret_nodeid;
2981
2982         error = find_lkb(ls, ms->m_lkid, &lkb);
2983         if (error) {
2984                 log_error(ls, "receive_lookup_reply no lkb");
2985                 return;
2986         }
2987
2988         error = remove_from_waiters(lkb);
2989         if (error) {
2990                 log_error(ls, "receive_lookup_reply not on waiters");
2991                 goto out;
2992         }
2993
2994         /* this is the value returned by dlm_dir_lookup on dir node
2995            FIXME: will a non-zero error ever be returned? */
2996         error = ms->m_result;
2997
2998         r = lkb->lkb_resource;
2999         hold_rsb(r);
3000         lock_rsb(r);
3001
3002         ret_nodeid = ms->m_nodeid;
3003         if (ret_nodeid == dlm_our_nodeid()) {
3004                 r->res_nodeid = 0;
3005                 ret_nodeid = 0;
3006                 r->res_first_lkid = 0;
3007         } else {
3008                 /* set_master() will copy res_nodeid to lkb_nodeid */
3009                 r->res_nodeid = ret_nodeid;
3010         }
3011
3012         _request_lock(r, lkb);
3013
3014         if (!ret_nodeid)
3015                 process_lookup_list(r);
3016
3017         unlock_rsb(r);
3018         put_rsb(r);
3019  out:
3020         dlm_put_lkb(lkb);
3021 }
3022
3023 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3024 {
3025         struct dlm_message *ms = (struct dlm_message *) hd;
3026         struct dlm_ls *ls;
3027         int error = 0;
3028
3029         if (!recovery)
3030                 dlm_message_in(ms);
3031
3032         ls = dlm_find_lockspace_global(hd->h_lockspace);
3033         if (!ls) {
3034                 log_print("drop message %d from %d for unknown lockspace %d",
3035                           ms->m_type, nodeid, hd->h_lockspace);
3036                 return -EINVAL;
3037         }
3038
3039         /* recovery may have just ended leaving a bunch of backed-up requests
3040            in the requestqueue; wait while dlm_recoverd clears them */
3041
3042         if (!recovery)
3043                 dlm_wait_requestqueue(ls);
3044
3045         /* recovery may have just started while there were a bunch of
3046            in-flight requests -- save them in requestqueue to be processed
3047            after recovery.  we can't let dlm_recvd block on the recovery
3048            lock.  if dlm_recoverd is calling this function to clear the
3049            requestqueue, it needs to be interrupted (-EINTR) if another
3050            recovery operation is starting. */
3051
3052         while (1) {
3053                 if (dlm_locking_stopped(ls)) {
3054                         if (recovery) {
3055                                 error = -EINTR;
3056                                 goto out;
3057                         }
3058                         error = dlm_add_requestqueue(ls, nodeid, hd);
3059                         if (error == -EAGAIN)
3060                                 continue;
3061                         else {
3062                                 error = -EINTR;
3063                                 goto out;
3064                         }
3065                 }
3066
3067                 if (lock_recovery_try(ls))
3068                         break;
3069                 schedule();
3070         }
3071
3072         switch (ms->m_type) {
3073
3074         /* messages sent to a master node */
3075
3076         case DLM_MSG_REQUEST:
3077                 receive_request(ls, ms);
3078                 break;
3079
3080         case DLM_MSG_CONVERT:
3081                 receive_convert(ls, ms);
3082                 break;
3083
3084         case DLM_MSG_UNLOCK:
3085                 receive_unlock(ls, ms);
3086                 break;
3087
3088         case DLM_MSG_CANCEL:
3089                 receive_cancel(ls, ms);
3090                 break;
3091
3092         /* messages sent from a master node (replies to above) */
3093
3094         case DLM_MSG_REQUEST_REPLY:
3095                 receive_request_reply(ls, ms);
3096                 break;
3097
3098         case DLM_MSG_CONVERT_REPLY:
3099                 receive_convert_reply(ls, ms);
3100                 break;
3101
3102         case DLM_MSG_UNLOCK_REPLY:
3103                 receive_unlock_reply(ls, ms);
3104                 break;
3105
3106         case DLM_MSG_CANCEL_REPLY:
3107                 receive_cancel_reply(ls, ms);
3108                 break;
3109
3110         /* messages sent from a master node (only two types of async msg) */
3111
3112         case DLM_MSG_GRANT:
3113                 receive_grant(ls, ms);
3114                 break;
3115
3116         case DLM_MSG_BAST:
3117                 receive_bast(ls, ms);
3118                 break;
3119
3120         /* messages sent to a dir node */
3121
3122         case DLM_MSG_LOOKUP:
3123                 receive_lookup(ls, ms);
3124                 break;
3125
3126         case DLM_MSG_REMOVE:
3127                 receive_remove(ls, ms);
3128                 break;
3129
3130         /* messages sent from a dir node (remove has no reply) */
3131
3132         case DLM_MSG_LOOKUP_REPLY:
3133                 receive_lookup_reply(ls, ms);
3134                 break;
3135
3136         default:
3137                 log_error(ls, "unknown message type %d", ms->m_type);
3138         }
3139
3140         unlock_recovery(ls);
3141  out:
3142         dlm_put_lockspace(ls);
3143         dlm_astd_wake();
3144         return error;
3145 }
3146
3147
3148 /*
3149  * Recovery related
3150  */
3151
3152 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3153 {
3154         if (middle_conversion(lkb)) {
3155                 hold_lkb(lkb);
3156                 ls->ls_stub_ms.m_result = -EINPROGRESS;
3157                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3158                 _remove_from_waiters(lkb);
3159                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3160
3161                 /* Same special case as in receive_rcom_lock_args() */
3162                 lkb->lkb_grmode = DLM_LOCK_IV;
3163                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3164                 unhold_lkb(lkb);
3165
3166         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3167                 lkb->lkb_flags |= DLM_IFL_RESEND;
3168         }
3169
3170         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3171            conversions are async; there's no reply from the remote master */
3172 }
3173
3174 /* A waiting lkb needs recovery if the master node has failed, or
3175    the master node is changing (only when no directory is used) */
3176
3177 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3178 {
3179         if (dlm_is_removed(ls, lkb->lkb_nodeid))
3180                 return 1;
3181
3182         if (!dlm_no_directory(ls))
3183                 return 0;
3184
3185         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3186                 return 1;
3187
3188         return 0;
3189 }
3190
3191 /* Recovery for locks that are waiting for replies from nodes that are now
3192    gone.  We can just complete unlocks and cancels by faking a reply from the
3193    dead node.  Requests and up-conversions we flag to be resent after
3194    recovery.  Down-conversions can just be completed with a fake reply like
3195    unlocks.  Conversions between PR and CW need special attention. */
3196
3197 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3198 {
3199         struct dlm_lkb *lkb, *safe;
3200
3201         mutex_lock(&ls->ls_waiters_mutex);
3202
3203         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3204                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3205                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3206
3207                 /* all outstanding lookups, regardless of destination  will be
3208                    resent after recovery is done */
3209
3210                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3211                         lkb->lkb_flags |= DLM_IFL_RESEND;
3212                         continue;
3213                 }
3214
3215                 if (!waiter_needs_recovery(ls, lkb))
3216                         continue;
3217
3218                 switch (lkb->lkb_wait_type) {
3219
3220                 case DLM_MSG_REQUEST:
3221                         lkb->lkb_flags |= DLM_IFL_RESEND;
3222                         break;
3223
3224                 case DLM_MSG_CONVERT:
3225                         recover_convert_waiter(ls, lkb);
3226                         break;
3227
3228                 case DLM_MSG_UNLOCK:
3229                         hold_lkb(lkb);
3230                         ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3231                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3232                         _remove_from_waiters(lkb);
3233                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
3234                         dlm_put_lkb(lkb);
3235                         break;
3236
3237                 case DLM_MSG_CANCEL:
3238                         hold_lkb(lkb);
3239                         ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3240                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3241                         _remove_from_waiters(lkb);
3242                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
3243                         dlm_put_lkb(lkb);
3244                         break;
3245
3246                 default:
3247                         log_error(ls, "invalid lkb wait_type %d",
3248                                   lkb->lkb_wait_type);
3249                 }
3250                 schedule();
3251         }
3252         mutex_unlock(&ls->ls_waiters_mutex);
3253 }
3254
3255 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3256 {
3257         struct dlm_lkb *lkb;
3258         int rv = 0;
3259
3260         mutex_lock(&ls->ls_waiters_mutex);
3261         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3262                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3263                         rv = lkb->lkb_wait_type;
3264                         _remove_from_waiters(lkb);
3265                         lkb->lkb_flags &= ~DLM_IFL_RESEND;
3266                         break;
3267                 }
3268         }
3269         mutex_unlock(&ls->ls_waiters_mutex);
3270
3271         if (!rv)
3272                 lkb = NULL;
3273         *lkb_ret = lkb;
3274         return rv;
3275 }
3276
3277 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3278    master or dir-node for r.  Processing the lkb may result in it being placed
3279    back on waiters. */
3280
3281 int dlm_recover_waiters_post(struct dlm_ls *ls)
3282 {
3283         struct dlm_lkb *lkb;
3284         struct dlm_rsb *r;
3285         int error = 0, mstype;
3286
3287         while (1) {
3288                 if (dlm_locking_stopped(ls)) {
3289                         log_debug(ls, "recover_waiters_post aborted");
3290                         error = -EINTR;
3291                         break;
3292                 }
3293
3294                 mstype = remove_resend_waiter(ls, &lkb);
3295                 if (!mstype)
3296                         break;
3297
3298                 r = lkb->lkb_resource;
3299
3300                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3301                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3302
3303                 switch (mstype) {
3304
3305                 case DLM_MSG_LOOKUP:
3306                         hold_rsb(r);
3307                         lock_rsb(r);
3308                         _request_lock(r, lkb);
3309                         if (is_master(r))
3310                                 confirm_master(r, 0);
3311                         unlock_rsb(r);
3312                         put_rsb(r);
3313                         break;
3314
3315                 case DLM_MSG_REQUEST:
3316                         hold_rsb(r);
3317                         lock_rsb(r);
3318                         _request_lock(r, lkb);
3319                         if (is_master(r))
3320                                 confirm_master(r, 0);
3321                         unlock_rsb(r);
3322                         put_rsb(r);
3323                         break;
3324
3325                 case DLM_MSG_CONVERT:
3326                         hold_rsb(r);
3327                         lock_rsb(r);
3328                         _convert_lock(r, lkb);
3329                         unlock_rsb(r);
3330                         put_rsb(r);
3331                         break;
3332
3333                 default:
3334                         log_error(ls, "recover_waiters_post type %d", mstype);
3335                 }
3336         }
3337
3338         return error;
3339 }
3340
3341 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3342                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3343 {
3344         struct dlm_ls *ls = r->res_ls;
3345         struct dlm_lkb *lkb, *safe;
3346
3347         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3348                 if (test(ls, lkb)) {
3349                         rsb_set_flag(r, RSB_LOCKS_PURGED);
3350                         del_lkb(r, lkb);
3351                         /* this put should free the lkb */
3352                         if (!dlm_put_lkb(lkb))
3353                                 log_error(ls, "purged lkb not released");
3354                 }
3355         }
3356 }
3357
3358 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3359 {
3360         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3361 }
3362
3363 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3364 {
3365         return is_master_copy(lkb);
3366 }
3367
3368 static void purge_dead_locks(struct dlm_rsb *r)
3369 {
3370         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3371         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3372         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3373 }
3374
3375 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3376 {
3377         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3378         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3379         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3380 }
3381
3382 /* Get rid of locks held by nodes that are gone. */
3383
3384 int dlm_purge_locks(struct dlm_ls *ls)
3385 {
3386         struct dlm_rsb *r;
3387
3388         log_debug(ls, "dlm_purge_locks");
3389
3390         down_write(&ls->ls_root_sem);
3391         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3392                 hold_rsb(r);
3393                 lock_rsb(r);
3394                 if (is_master(r))
3395                         purge_dead_locks(r);
3396                 unlock_rsb(r);
3397                 unhold_rsb(r);
3398
3399                 schedule();
3400         }
3401         up_write(&ls->ls_root_sem);
3402
3403         return 0;
3404 }
3405
3406 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3407 {
3408         struct dlm_rsb *r, *r_ret = NULL;
3409
3410         read_lock(&ls->ls_rsbtbl[bucket].lock);
3411         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3412                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3413                         continue;
3414                 hold_rsb(r);
3415                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3416                 r_ret = r;
3417                 break;
3418         }
3419         read_unlock(&ls->ls_rsbtbl[bucket].lock);
3420         return r_ret;
3421 }
3422
3423 void dlm_grant_after_purge(struct dlm_ls *ls)
3424 {
3425         struct dlm_rsb *r;
3426         int bucket = 0;
3427
3428         while (1) {
3429                 r = find_purged_rsb(ls, bucket);
3430                 if (!r) {
3431                         if (bucket == ls->ls_rsbtbl_size - 1)
3432                                 break;
3433                         bucket++;
3434                         continue;
3435                 }
3436                 lock_rsb(r);
3437                 if (is_master(r)) {
3438                         grant_pending_locks(r);
3439                         confirm_master(r, 0);
3440                 }
3441                 unlock_rsb(r);
3442                 put_rsb(r);
3443                 schedule();
3444         }
3445 }
3446
3447 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3448                                          uint32_t remid)
3449 {
3450         struct dlm_lkb *lkb;
3451
3452         list_for_each_entry(lkb, head, lkb_statequeue) {
3453                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3454                         return lkb;
3455         }
3456         return NULL;
3457 }
3458
3459 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3460                                     uint32_t remid)
3461 {
3462         struct dlm_lkb *lkb;
3463
3464         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3465         if (lkb)
3466                 return lkb;
3467         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3468         if (lkb)
3469                 return lkb;
3470         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3471         if (lkb)
3472                 return lkb;
3473         return NULL;
3474 }
3475
3476 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3477                                   struct dlm_rsb *r, struct dlm_rcom *rc)
3478 {
3479         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3480         int lvblen;
3481
3482         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3483         lkb->lkb_ownpid = rl->rl_ownpid;
3484         lkb->lkb_remid = rl->rl_lkid;
3485         lkb->lkb_exflags = rl->rl_exflags;
3486         lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3487         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3488         lkb->lkb_lvbseq = rl->rl_lvbseq;
3489         lkb->lkb_rqmode = rl->rl_rqmode;
3490         lkb->lkb_grmode = rl->rl_grmode;
3491         /* don't set lkb_status because add_lkb wants to itself */
3492
3493         lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3494         lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3495
3496         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3497                 lkb->lkb_lvbptr = allocate_lvb(ls);
3498                 if (!lkb->lkb_lvbptr)
3499                         return -ENOMEM;
3500                 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3501                          sizeof(struct rcom_lock);
3502                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3503         }
3504
3505         /* Conversions between PR and CW (middle modes) need special handling.
3506            The real granted mode of these converting locks cannot be determined
3507            until all locks have been rebuilt on the rsb (recover_conversion) */
3508
3509         if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3510                 rl->rl_status = DLM_LKSTS_CONVERT;
3511                 lkb->lkb_grmode = DLM_LOCK_IV;
3512                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3513         }
3514
3515         return 0;
3516 }
3517
3518 /* This lkb may have been recovered in a previous aborted recovery so we need
3519    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3520    If so we just send back a standard reply.  If not, we create a new lkb with
3521    the given values and send back our lkid.  We send back our lkid by sending
3522    back the rcom_lock struct we got but with the remid field filled in. */
3523
3524 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3525 {
3526         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3527         struct dlm_rsb *r;
3528         struct dlm_lkb *lkb;
3529         int error;
3530
3531         if (rl->rl_parent_lkid) {
3532                 error = -EOPNOTSUPP;
3533                 goto out;
3534         }
3535
3536         error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3537         if (error)
3538                 goto out;
3539
3540         lock_rsb(r);
3541
3542         lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3543         if (lkb) {
3544                 error = -EEXIST;
3545                 goto out_remid;
3546         }
3547
3548         error = create_lkb(ls, &lkb);
3549         if (error)
3550                 goto out_unlock;
3551
3552         error = receive_rcom_lock_args(ls, lkb, r, rc);
3553         if (error) {
3554                 __put_lkb(ls, lkb);
3555                 goto out_unlock;
3556         }
3557
3558         attach_lkb(r, lkb);
3559         add_lkb(r, lkb, rl->rl_status);
3560         error = 0;
3561
3562  out_remid:
3563         /* this is the new value returned to the lock holder for
3564            saving in its process-copy lkb */
3565         rl->rl_remid = lkb->lkb_id;
3566
3567  out_unlock:
3568         unlock_rsb(r);
3569         put_rsb(r);
3570  out:
3571         if (error)
3572                 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3573         rl->rl_result = error;
3574         return error;
3575 }
3576
3577 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3578 {
3579         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3580         struct dlm_rsb *r;
3581         struct dlm_lkb *lkb;
3582         int error;
3583
3584         error = find_lkb(ls, rl->rl_lkid, &lkb);
3585         if (error) {
3586                 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3587                 return error;
3588         }
3589
3590         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3591
3592         error = rl->rl_result;
3593
3594         r = lkb->lkb_resource;
3595         hold_rsb(r);
3596         lock_rsb(r);
3597
3598         switch (error) {
3599         case -EBADR:
3600                 /* There's a chance the new master received our lock before
3601                    dlm_recover_master_reply(), this wouldn't happen if we did
3602                    a barrier between recover_masters and recover_locks. */
3603                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
3604                           (unsigned long)r, r->res_name);
3605                 dlm_send_rcom_lock(r, lkb);
3606                 goto out;
3607         case -EEXIST:
3608                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3609                 /* fall through */
3610         case 0:
3611                 lkb->lkb_remid = rl->rl_remid;
3612                 break;
3613         default:
3614                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3615                           error, lkb->lkb_id);
3616         }
3617
3618         /* an ack for dlm_recover_locks() which waits for replies from
3619            all the locks it sends to new masters */
3620         dlm_recovered_lock(r);
3621  out:
3622         unlock_rsb(r);
3623         put_rsb(r);
3624         dlm_put_lkb(lkb);
3625
3626         return 0;
3627 }
3628
3629 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3630                      int mode, uint32_t flags, void *name, unsigned int namelen,
3631                      uint32_t parent_lkid)
3632 {
3633         struct dlm_lkb *lkb;
3634         struct dlm_args args;
3635         int error;
3636
3637         lock_recovery(ls);
3638
3639         error = create_lkb(ls, &lkb);
3640         if (error) {
3641                 kfree(ua);
3642                 goto out;
3643         }
3644
3645         if (flags & DLM_LKF_VALBLK) {
3646                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3647                 if (!ua->lksb.sb_lvbptr) {
3648                         kfree(ua);
3649                         __put_lkb(ls, lkb);
3650                         error = -ENOMEM;
3651                         goto out;
3652                 }
3653         }
3654
3655         /* After ua is attached to lkb it will be freed by free_lkb().
3656            When DLM_IFL_USER is set, the dlm knows that this is a userspace
3657            lock and that lkb_astparam is the dlm_user_args structure. */
3658
3659         error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3660                               DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
3661         lkb->lkb_flags |= DLM_IFL_USER;
3662         ua->old_mode = DLM_LOCK_IV;
3663
3664         if (error) {
3665                 __put_lkb(ls, lkb);
3666                 goto out;
3667         }
3668
3669         error = request_lock(ls, lkb, name, namelen, &args);
3670
3671         switch (error) {
3672         case 0:
3673                 break;
3674         case -EINPROGRESS:
3675                 error = 0;
3676                 break;
3677         case -EAGAIN:
3678                 error = 0;
3679                 /* fall through */
3680         default:
3681                 __put_lkb(ls, lkb);
3682                 goto out;
3683         }
3684
3685         /* add this new lkb to the per-process list of locks */
3686         spin_lock(&ua->proc->locks_spin);
3687         kref_get(&lkb->lkb_ref);
3688         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3689         spin_unlock(&ua->proc->locks_spin);
3690  out:
3691         unlock_recovery(ls);
3692         return error;
3693 }
3694
3695 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3696                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3697 {
3698         struct dlm_lkb *lkb;
3699         struct dlm_args args;
3700         struct dlm_user_args *ua;
3701         int error;
3702
3703         lock_recovery(ls);
3704
3705         error = find_lkb(ls, lkid, &lkb);
3706         if (error)
3707                 goto out;
3708
3709         /* user can change the params on its lock when it converts it, or
3710            add an lvb that didn't exist before */
3711
3712         ua = (struct dlm_user_args *)lkb->lkb_astparam;
3713
3714         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3715                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3716                 if (!ua->lksb.sb_lvbptr) {
3717                         error = -ENOMEM;
3718                         goto out_put;
3719                 }
3720         }
3721         if (lvb_in && ua->lksb.sb_lvbptr)
3722                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3723
3724         ua->castparam = ua_tmp->castparam;
3725         ua->castaddr = ua_tmp->castaddr;
3726         ua->bastparam = ua_tmp->bastparam;
3727         ua->bastaddr = ua_tmp->bastaddr;
3728         ua->user_lksb = ua_tmp->user_lksb;
3729         ua->old_mode = lkb->lkb_grmode;
3730
3731         error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
3732                               ua, DLM_FAKE_USER_AST, &args);
3733         if (error)
3734                 goto out_put;
3735
3736         error = convert_lock(ls, lkb, &args);
3737
3738         if (error == -EINPROGRESS || error == -EAGAIN)
3739                 error = 0;
3740  out_put:
3741         dlm_put_lkb(lkb);
3742  out:
3743         unlock_recovery(ls);
3744         kfree(ua_tmp);
3745         return error;
3746 }
3747
3748 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3749                     uint32_t flags, uint32_t lkid, char *lvb_in)
3750 {
3751         struct dlm_lkb *lkb;
3752         struct dlm_args args;
3753         struct dlm_user_args *ua;
3754         int error;
3755
3756         lock_recovery(ls);
3757
3758         error = find_lkb(ls, lkid, &lkb);
3759         if (error)
3760                 goto out;
3761
3762         ua = (struct dlm_user_args *)lkb->lkb_astparam;
3763
3764         if (lvb_in && ua->lksb.sb_lvbptr)
3765                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3766         ua->castparam = ua_tmp->castparam;
3767         ua->user_lksb = ua_tmp->user_lksb;
3768
3769         error = set_unlock_args(flags, ua, &args);
3770         if (error)
3771                 goto out_put;
3772
3773         error = unlock_lock(ls, lkb, &args);
3774
3775         if (error == -DLM_EUNLOCK)
3776                 error = 0;
3777         if (error)
3778                 goto out_put;
3779
3780         spin_lock(&ua->proc->locks_spin);
3781         /* dlm_user_add_ast() may have already taken lkb off the proc list */
3782         if (!list_empty(&lkb->lkb_ownqueue))
3783                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
3784         spin_unlock(&ua->proc->locks_spin);
3785  out_put:
3786         dlm_put_lkb(lkb);
3787  out:
3788         unlock_recovery(ls);
3789         return error;
3790 }
3791
3792 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3793                     uint32_t flags, uint32_t lkid)
3794 {
3795         struct dlm_lkb *lkb;
3796         struct dlm_args args;
3797         struct dlm_user_args *ua;
3798         int error;
3799
3800         lock_recovery(ls);
3801
3802         error = find_lkb(ls, lkid, &lkb);
3803         if (error)
3804                 goto out;
3805
3806         ua = (struct dlm_user_args *)lkb->lkb_astparam;
3807         ua->castparam = ua_tmp->castparam;
3808         ua->user_lksb = ua_tmp->user_lksb;
3809
3810         error = set_unlock_args(flags, ua, &args);
3811         if (error)
3812                 goto out_put;
3813
3814         error = cancel_lock(ls, lkb, &args);
3815
3816         if (error == -DLM_ECANCEL)
3817                 error = 0;
3818         if (error)
3819                 goto out_put;
3820
3821         /* this lkb was removed from the WAITING queue */
3822         if (lkb->lkb_grmode == DLM_LOCK_IV) {
3823                 spin_lock(&ua->proc->locks_spin);
3824                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
3825                 spin_unlock(&ua->proc->locks_spin);
3826         }
3827  out_put:
3828         dlm_put_lkb(lkb);
3829  out:
3830         unlock_recovery(ls);
3831         return error;
3832 }
3833
3834 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3835 {
3836         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3837
3838         if (ua->lksb.sb_lvbptr)
3839                 kfree(ua->lksb.sb_lvbptr);
3840         kfree(ua);
3841         lkb->lkb_astparam = (long)NULL;
3842
3843         /* TODO: propogate to master if needed */
3844         return 0;
3845 }
3846
3847 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3848    Regardless of what rsb queue the lock is on, it's removed and freed. */
3849
3850 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3851 {
3852         struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3853         struct dlm_args args;
3854         int error;
3855
3856         /* FIXME: we need to handle the case where the lkb is in limbo
3857            while the rsb is being looked up, currently we assert in
3858            _unlock_lock/is_remote because rsb nodeid is -1. */
3859
3860         set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3861
3862         error = unlock_lock(ls, lkb, &args);
3863         if (error == -DLM_EUNLOCK)
3864                 error = 0;
3865         return error;
3866 }
3867
3868 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3869    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3870    which we clear here. */
3871
3872 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
3873    list, and no more device_writes should add lkb's to proc->locks list; so we
3874    shouldn't need to take asts_spin or locks_spin here.  this assumes that
3875    device reads/writes/closes are serialized -- FIXME: we may need to serialize
3876    them ourself. */
3877
3878 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3879 {
3880         struct dlm_lkb *lkb, *safe;
3881
3882         lock_recovery(ls);
3883         mutex_lock(&ls->ls_clear_proc_locks);
3884
3885         list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3886                 list_del_init(&lkb->lkb_ownqueue);
3887
3888                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3889                         lkb->lkb_flags |= DLM_IFL_ORPHAN;
3890                         orphan_proc_lock(ls, lkb);
3891                 } else {
3892                         lkb->lkb_flags |= DLM_IFL_DEAD;
3893                         unlock_proc_lock(ls, lkb);
3894                 }
3895
3896                 /* this removes the reference for the proc->locks list
3897                    added by dlm_user_request, it may result in the lkb
3898                    being freed */
3899
3900                 dlm_put_lkb(lkb);
3901         }
3902
3903         /* in-progress unlocks */
3904         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
3905                 list_del_init(&lkb->lkb_ownqueue);
3906                 lkb->lkb_flags |= DLM_IFL_DEAD;
3907                 dlm_put_lkb(lkb);
3908         }
3909
3910         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
3911                 list_del(&lkb->lkb_astqueue);
3912                 dlm_put_lkb(lkb);
3913         }
3914
3915         mutex_unlock(&ls->ls_clear_proc_locks);
3916         unlock_recovery(ls);
3917 }
3918