X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=fs%2Focfs2%2Fdlm%2Fdlmlock.c;h=e5ca3db197f63c647a37c201b70b259a746de2e8;hb=97bf2856c6014879bd04983a3e9dfcdac1e7fe85;hp=671d4ff222cc083c15aa63ed33048c899da2d26b;hpb=76828883507a47dae78837ab5dec5a5b4513c667;p=linux-2.6.git diff --git a/fs/ocfs2/dlm/dlmlock.c b/fs/ocfs2/dlm/dlmlock.c index 671d4ff22..e5ca3db19 100644 --- a/fs/ocfs2/dlm/dlmlock.c +++ b/fs/ocfs2/dlm/dlmlock.c @@ -53,7 +53,7 @@ #define MLOG_MASK_PREFIX ML_DLM #include "cluster/masklog.h" -static spinlock_t dlm_cookie_lock = SPIN_LOCK_UNLOCKED; +static DEFINE_SPINLOCK(dlm_cookie_lock); static u64 dlm_next_cookie = 1; static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, @@ -141,13 +141,23 @@ static enum dlm_status dlmlock_master(struct dlm_ctxt *dlm, res->lockname.len)) { kick_thread = 1; call_ast = 1; + } else { + mlog(0, "%s: returning DLM_NORMAL to " + "node %u for reco lock\n", dlm->name, + lock->ml.node); } } else { /* for NOQUEUE request, unless we get the * lock right away, return DLM_NOTQUEUED */ - if (flags & LKM_NOQUEUE) + if (flags & LKM_NOQUEUE) { status = DLM_NOTQUEUED; - else { + if (dlm_is_recovery_lock(res->lockname.name, + res->lockname.len)) { + mlog(0, "%s: returning NOTQUEUED to " + "node %u for reco lock\n", dlm->name, + lock->ml.node); + } + } else { dlm_lock_get(lock); list_add_tail(&lock->list, &res->blocked); kick_thread = 1; @@ -191,6 +201,7 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, struct dlm_lock *lock, int flags) { enum dlm_status status = DLM_DENIED; + int lockres_changed = 1; mlog_entry("type=%d\n", lock->ml.type); mlog(0, "lockres %.*s, flags = 0x%x\n", res->lockname.len, @@ -216,8 +227,25 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, res->state &= ~DLM_LOCK_RES_IN_PROGRESS; lock->lock_pending = 0; if (status != DLM_NORMAL) { - if (status != DLM_NOTQUEUED) + if (status == DLM_RECOVERING && + dlm_is_recovery_lock(res->lockname.name, + res->lockname.len)) { + /* recovery lock was mastered by dead node. + * we need to have calc_usage shoot down this + * lockres and completely remaster it. */ + mlog(0, "%s: recovery lock was owned by " + "dead node %u, remaster it now.\n", + dlm->name, res->owner); + } else if (status != DLM_NOTQUEUED) { + /* + * DO NOT call calc_usage, as this would unhash + * the remote lockres before we ever get to use + * it. treat as if we never made any change to + * the lockres. + */ + lockres_changed = 0; dlm_error(status); + } dlm_revert_pending_lock(res, lock); dlm_lock_put(lock); } else if (dlm_is_recovery_lock(res->lockname.name, @@ -229,12 +257,12 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm, mlog(0, "%s: $RECOVERY lock for this node (%u) is " "mastered by %u; got lock, manually granting (no ast)\n", dlm->name, dlm->node_num, res->owner); - list_del_init(&lock->list); - list_add_tail(&lock->list, &res->granted); + list_move_tail(&lock->list, &res->granted); } spin_unlock(&res->spinlock); - dlm_lockres_calc_usage(dlm, res); + if (lockres_changed) + dlm_lockres_calc_usage(dlm, res); wake_up(&res->wq); return status; @@ -271,6 +299,14 @@ static enum dlm_status dlm_send_remote_lock_request(struct dlm_ctxt *dlm, if (tmpret >= 0) { // successfully sent and received ret = status; // this is already a dlm_status + if (ret == DLM_REJECTED) { + mlog(ML_ERROR, "%s:%.*s: BUG. this is a stale lockres " + "no longer owned by %u. that node is coming back " + "up currently.\n", dlm->name, create.namelen, + create.name, res->owner); + dlm_print_one_lock_resource(res); + BUG(); + } } else { mlog_errno(tmpret); if (dlm_is_host_down(tmpret)) { @@ -372,13 +408,13 @@ struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, struct dlm_lock *lock; int kernel_allocated = 0; - lock = kcalloc(1, sizeof(*lock), GFP_KERNEL); + lock = kzalloc(sizeof(*lock), GFP_NOFS); if (!lock) return NULL; if (!lksb) { /* zero memory only if kernel-allocated */ - lksb = kcalloc(1, sizeof(*lksb), GFP_KERNEL); + lksb = kzalloc(sizeof(*lksb), GFP_NOFS); if (!lksb) { kfree(lock); return NULL; @@ -419,11 +455,16 @@ int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data) if (!dlm_grab(dlm)) return DLM_REJECTED; - mlog_bug_on_msg(!dlm_domain_fully_joined(dlm), - "Domain %s not fully joined!\n", dlm->name); - name = create->name; namelen = create->namelen; + status = DLM_REJECTED; + if (!dlm_domain_fully_joined(dlm)) { + mlog(ML_ERROR, "Domain %s not fully joined, but node %u is " + "sending a create_lock message for lock %.*s!\n", + dlm->name, create->node_idx, namelen, name); + dlm_error(status); + goto leave; + } status = DLM_IVBUFLEN; if (namelen > DLM_LOCKID_NAME_MAX) { @@ -499,8 +540,8 @@ static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie) enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode, struct dlm_lockstatus *lksb, int flags, - const char *name, dlm_astlockfunc_t *ast, void *data, - dlm_bastlockfunc_t *bast) + const char *name, int namelen, dlm_astlockfunc_t *ast, + void *data, dlm_bastlockfunc_t *bast) { enum dlm_status status; struct dlm_lock_resource *res = NULL; @@ -530,7 +571,7 @@ enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode, recovery = (flags & LKM_RECOVERY); if (recovery && - (!dlm_is_recovery_lock(name, strlen(name)) || convert) ) { + (!dlm_is_recovery_lock(name, namelen) || convert) ) { dlm_error(status); goto error; } @@ -602,7 +643,7 @@ retry_convert: } status = DLM_IVBUFLEN; - if (strlen(name) > DLM_LOCKID_NAME_MAX || strlen(name) < 1) { + if (namelen > DLM_LOCKID_NAME_MAX || namelen < 1) { dlm_error(status); goto error; } @@ -618,7 +659,7 @@ retry_convert: dlm_wait_for_recovery(dlm); /* find or create the lock resource */ - res = dlm_get_lock_resource(dlm, name, flags); + res = dlm_get_lock_resource(dlm, name, namelen, flags); if (!res) { status = DLM_IVLOCKID; dlm_error(status); @@ -659,18 +700,22 @@ retry_lock: msleep(100); /* no waiting for dlm_reco_thread */ if (recovery) { - if (status == DLM_RECOVERING) { - mlog(0, "%s: got RECOVERING " - "for $REOCVERY lock, master " - "was %u\n", dlm->name, - res->owner); - dlm_wait_for_node_death(dlm, res->owner, - DLM_NODE_DEATH_WAIT_MAX); - } + if (status != DLM_RECOVERING) + goto retry_lock; + + mlog(0, "%s: got RECOVERING " + "for $RECOVERY lock, master " + "was %u\n", dlm->name, + res->owner); + /* wait to see the node go down, then + * drop down and allow the lockres to + * get cleaned up. need to remaster. */ + dlm_wait_for_node_death(dlm, res->owner, + DLM_NODE_DEATH_WAIT_MAX); } else { dlm_wait_for_recovery(dlm); + goto retry_lock; } - goto retry_lock; } if (status != DLM_NORMAL) {