X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=fs%2Focfs2%2Fdlm%2Fdlmmaster.c;h=940be4c13b1f09ff4703662f007692a6d4b81e89;hb=9464c7cf61b9433057924c36e6e02f303a00e768;hp=9503240ef0e50d46f413e46b62a665de3eec57bd;hpb=41689045f6a3cbe0550e1d34e9cc20d2e8c432ba;p=linux-2.6.git diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c index 9503240ef..940be4c13 100644 --- a/fs/ocfs2/dlm/dlmmaster.c +++ b/fs/ocfs2/dlm/dlmmaster.c @@ -47,6 +47,7 @@ #include "dlmapi.h" #include "dlmcommon.h" +#include "dlmdebug.h" #include "dlmdomain.h" #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER) @@ -73,7 +74,6 @@ struct dlm_master_list_entry wait_queue_head_t wq; atomic_t woken; struct kref mle_refs; - int inuse; unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; @@ -127,30 +127,18 @@ static inline int dlm_mle_equal(struct dlm_ctxt *dlm, return 1; } -#define dlm_print_nodemap(m) _dlm_print_nodemap(m,#m) -static void _dlm_print_nodemap(unsigned long *map, const char *mapname) -{ - int i; - printk("%s=[ ", mapname); - for (i=0; imaybe_map, - *vote = mle->vote_map, - *resp = mle->response_map, - *node = mle->node_map; k = &mle->mle_refs; if (mle->type == DLM_MLE_BLOCK) @@ -171,29 +159,18 @@ static void dlm_print_one_mle(struct dlm_master_list_entry *mle) name = mle->u.res->lockname.name; } - mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ", - namelen, name, type, refs, master, mle->new_master, attached, - mle->inuse); - dlm_print_nodemap(maybe); - printk(", "); - dlm_print_nodemap(vote); - printk(", "); - dlm_print_nodemap(resp); - printk(", "); - dlm_print_nodemap(node); - printk(", "); - printk("\n"); + mlog(ML_NOTICE, " #%3d: %3s %3d %3u %3u %c (%d)%.*s\n", + i, type, refs, master, mle->new_master, attached, + namelen, namelen, name); } -#if 0 -/* Code here is included but defined out as it aids debugging */ - static void dlm_dump_mles(struct dlm_ctxt *dlm) { struct dlm_master_list_entry *mle; struct list_head *iter; mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name); + mlog(ML_NOTICE, " ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n"); spin_lock(&dlm->master_lock); list_for_each(iter, &dlm->master_list) { mle = list_entry(iter, struct dlm_master_list_entry, list); @@ -337,31 +314,6 @@ static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm, spin_unlock(&dlm->spinlock); } -static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle) -{ - struct dlm_ctxt *dlm; - dlm = mle->dlm; - - assert_spin_locked(&dlm->spinlock); - assert_spin_locked(&dlm->master_lock); - mle->inuse++; - kref_get(&mle->mle_refs); -} - -static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle) -{ - struct dlm_ctxt *dlm; - dlm = mle->dlm; - - spin_lock(&dlm->spinlock); - spin_lock(&dlm->master_lock); - mle->inuse--; - __dlm_put_mle(mle); - spin_unlock(&dlm->master_lock); - spin_unlock(&dlm->spinlock); - -} - /* remove from list and free */ static void __dlm_put_mle(struct dlm_master_list_entry *mle) { @@ -370,14 +322,9 @@ static void __dlm_put_mle(struct dlm_master_list_entry *mle) assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); - if (!atomic_read(&mle->mle_refs.refcount)) { - /* this may or may not crash, but who cares. - * it's a BUG. */ - mlog(ML_ERROR, "bad mle: %p\n", mle); - dlm_print_one_mle(mle); - BUG(); - } else - kref_put(&mle->mle_refs, dlm_mle_release); + BUG_ON(!atomic_read(&mle->mle_refs.refcount)); + + kref_put(&mle->mle_refs, dlm_mle_release); } @@ -420,7 +367,6 @@ static void dlm_init_mle(struct dlm_master_list_entry *mle, memset(mle->response_map, 0, sizeof(mle->response_map)); mle->master = O2NM_MAX_NODES; mle->new_master = O2NM_MAX_NODES; - mle->inuse = 0; if (mle->type == DLM_MLE_MASTER) { BUG_ON(!res); @@ -618,28 +564,6 @@ static void dlm_lockres_release(struct kref *kref) mlog(0, "destroying lockres %.*s\n", res->lockname.len, res->lockname.name); - if (!hlist_unhashed(&res->hash_node) || - !list_empty(&res->granted) || - !list_empty(&res->converting) || - !list_empty(&res->blocked) || - !list_empty(&res->dirty) || - !list_empty(&res->recovering) || - !list_empty(&res->purge)) { - mlog(ML_ERROR, - "Going to BUG for resource %.*s." - " We're on a list! [%c%c%c%c%c%c%c]\n", - res->lockname.len, res->lockname.name, - !hlist_unhashed(&res->hash_node) ? 'H' : ' ', - !list_empty(&res->granted) ? 'G' : ' ', - !list_empty(&res->converting) ? 'C' : ' ', - !list_empty(&res->blocked) ? 'B' : ' ', - !list_empty(&res->dirty) ? 'D' : ' ', - !list_empty(&res->recovering) ? 'R' : ' ', - !list_empty(&res->purge) ? 'P' : ' '); - - dlm_print_one_lock_resource(res); - } - /* By the time we're ready to blow this guy away, we shouldn't * be on any lists. */ BUG_ON(!hlist_unhashed(&res->hash_node)); @@ -655,6 +579,11 @@ static void dlm_lockres_release(struct kref *kref) kfree(res); } +void dlm_lockres_get(struct dlm_lock_resource *res) +{ + kref_get(&res->refs); +} + void dlm_lockres_put(struct dlm_lock_resource *res) { kref_put(&res->refs, dlm_lockres_release); @@ -674,7 +603,7 @@ static void dlm_init_lockres(struct dlm_ctxt *dlm, memcpy(qname, name, namelen); res->lockname.len = namelen; - res->lockname.hash = dlm_lockid_hash(name, namelen); + res->lockname.hash = full_name_hash(name, namelen); init_waitqueue_head(&res->wq); spin_lock_init(&res->spinlock); @@ -708,11 +637,11 @@ struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, { struct dlm_lock_resource *res; - res = kmalloc(sizeof(struct dlm_lock_resource), GFP_NOFS); + res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); if (!res) return NULL; - res->lockname.name = kmalloc(namelen, GFP_NOFS); + res->lockname.name = kmalloc(namelen, GFP_KERNEL); if (!res->lockname.name) { kfree(res); return NULL; @@ -748,20 +677,19 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, int blocked = 0; int ret, nodenum; struct dlm_node_iter iter; - unsigned int namelen, hash; + unsigned int namelen; int tries = 0; int bit, wait_on_recovery = 0; BUG_ON(!lockid); namelen = strlen(lockid); - hash = dlm_lockid_hash(lockid, namelen); mlog(0, "get lockres %s (len %d)\n", lockid, namelen); lookup: spin_lock(&dlm->spinlock); - tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash); + tmpres = __dlm_lookup_lockres(dlm, lockid, namelen); if (tmpres) { spin_unlock(&dlm->spinlock); mlog(0, "found in hash!\n"); @@ -776,7 +704,7 @@ lookup: mlog(0, "allocating a new resource\n"); /* nothing found and we need to allocate one. */ alloc_mle = (struct dlm_master_list_entry *) - kmem_cache_alloc(dlm_mle_cache, GFP_NOFS); + kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); if (!alloc_mle) goto leave; res = dlm_new_lockres(dlm, lockid, namelen); @@ -862,11 +790,10 @@ lookup: * if so, the creator of the BLOCK may try to put the last * ref at this time in the assert master handler, so we * need an extra one to keep from a bad ptr deref. */ - dlm_get_mle_inuse(mle); + dlm_get_mle(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); -redo_request: while (wait_on_recovery) { /* any cluster changes that occurred after dropping the * dlm spinlock would be detectable be a change on the mle, @@ -885,7 +812,7 @@ redo_request: } dlm_kick_recovery_thread(dlm); - msleep(1000); + msleep(100); dlm_wait_for_recovery(dlm); spin_lock(&dlm->spinlock); @@ -898,15 +825,13 @@ redo_request: } else wait_on_recovery = 0; spin_unlock(&dlm->spinlock); - - if (wait_on_recovery) - dlm_wait_for_node_recovery(dlm, bit, 10000); } /* must wait for lock to be mastered elsewhere */ if (blocked) goto wait; +redo_request: ret = -EINVAL; dlm_node_iter_init(mle->vote_map, &iter); while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { @@ -931,7 +856,6 @@ wait: /* keep going until the response map includes all nodes */ ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); if (ret < 0) { - wait_on_recovery = 1; mlog(0, "%s:%.*s: node map changed, redo the " "master request now, blocked=%d\n", dlm->name, res->lockname.len, @@ -942,7 +866,7 @@ wait: dlm->name, res->lockname.len, res->lockname.name, blocked); dlm_print_one_lock_resource(res); - dlm_print_one_mle(mle); + /* dlm_print_one_mle(mle); */ tries = 0; } goto redo_request; @@ -956,7 +880,7 @@ wait: dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); /* put the extra ref */ - dlm_put_mle_inuse(mle); + dlm_put_mle(mle); wake_waiters: spin_lock(&res->spinlock); @@ -997,14 +921,12 @@ recheck: spin_unlock(&res->spinlock); /* this will cause the master to re-assert across * the whole cluster, freeing up mles */ - if (res->owner != dlm->node_num) { - ret = dlm_do_master_request(mle, res->owner); - if (ret < 0) { - /* give recovery a chance to run */ - mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); - msleep(500); - goto recheck; - } + ret = dlm_do_master_request(mle, res->owner); + if (ret < 0) { + /* give recovery a chance to run */ + mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); + msleep(500); + goto recheck; } ret = 0; goto leave; @@ -1040,12 +962,6 @@ recheck: "rechecking now\n", dlm->name, res->lockname.len, res->lockname.name); goto recheck; - } else { - if (!voting_done) { - mlog(0, "map not changed and voting not done " - "for %s:%.*s\n", dlm->name, res->lockname.len, - res->lockname.name); - } } if (m != O2NM_MAX_NODES) { @@ -1213,6 +1129,18 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, set_bit(node, mle->vote_map); } else { mlog(ML_ERROR, "node down! %d\n", node); + + /* if the node wasn't involved in mastery skip it, + * but clear it out from the maps so that it will + * not affect mastery of this lockres */ + clear_bit(node, mle->response_map); + clear_bit(node, mle->vote_map); + if (!test_bit(node, mle->maybe_map)) + goto next; + + /* if we're already blocked on lock mastery, and the + * dead node wasn't the expected master, or there is + * another node in the maybe_map, keep waiting */ if (blocked) { int lowest = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); @@ -1220,53 +1148,54 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, /* act like it was never there */ clear_bit(node, mle->maybe_map); - if (node == lowest) { - mlog(0, "expected master %u died" - " while this node was blocked " - "waiting on it!\n", node); - lowest = find_next_bit(mle->maybe_map, - O2NM_MAX_NODES, - lowest+1); - if (lowest < O2NM_MAX_NODES) { - mlog(0, "%s:%.*s:still " - "blocked. waiting on %u " - "now\n", dlm->name, - res->lockname.len, - res->lockname.name, - lowest); - } else { - /* mle is an MLE_BLOCK, but - * there is now nothing left to - * block on. we need to return - * all the way back out and try - * again with an MLE_MASTER. - * dlm_do_local_recovery_cleanup - * has already run, so the mle - * refcount is ok */ - mlog(0, "%s:%.*s: no " - "longer blocking. try to " - "master this here\n", - dlm->name, - res->lockname.len, - res->lockname.name); - mle->type = DLM_MLE_MASTER; - mle->u.res = res; - } + if (node != lowest) + goto next; + + mlog(ML_ERROR, "expected master %u died while " + "this node was blocked waiting on it!\n", + node); + lowest = find_next_bit(mle->maybe_map, + O2NM_MAX_NODES, + lowest+1); + if (lowest < O2NM_MAX_NODES) { + mlog(0, "still blocked. waiting " + "on %u now\n", lowest); + goto next; } - } - /* now blank out everything, as if we had never - * contacted anyone */ - memset(mle->maybe_map, 0, sizeof(mle->maybe_map)); - memset(mle->response_map, 0, sizeof(mle->response_map)); - /* reset the vote_map to the current node_map */ - memcpy(mle->vote_map, mle->node_map, - sizeof(mle->node_map)); - /* put myself into the maybe map */ - if (mle->type != DLM_MLE_BLOCK) + /* mle is an MLE_BLOCK, but there is now + * nothing left to block on. we need to return + * all the way back out and try again with + * an MLE_MASTER. dlm_do_local_recovery_cleanup + * has already run, so the mle refcount is ok */ + mlog(0, "no longer blocking. we can " + "try to master this here\n"); + mle->type = DLM_MLE_MASTER; + memset(mle->maybe_map, 0, + sizeof(mle->maybe_map)); + memset(mle->response_map, 0, + sizeof(mle->maybe_map)); + memcpy(mle->vote_map, mle->node_map, + sizeof(mle->node_map)); + mle->u.res = res; set_bit(dlm->node_num, mle->maybe_map); + + ret = -EAGAIN; + goto next; + } + + clear_bit(node, mle->maybe_map); + if (node > dlm->node_num) + goto next; + + mlog(0, "dead node in map!\n"); + /* yuck. go back and re-contact all nodes + * in the vote_map, removing this node. */ + memset(mle->response_map, 0, + sizeof(mle->response_map)); } ret = -EAGAIN; +next: node = dlm_bitmap_diff_iter_next(&bdi, &sc); } return ret; @@ -1387,7 +1316,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_master_request *request = (struct dlm_master_request *) msg->buf; struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL; char *name; - unsigned int namelen, hash; + unsigned int namelen; int found, ret; int set_maybe; int dispatch_assert = 0; @@ -1402,7 +1331,6 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) name = request->name; namelen = request->namelen; - hash = dlm_lockid_hash(name, namelen); if (namelen > DLM_LOCKID_NAME_MAX) { response = DLM_IVBUFLEN; @@ -1411,7 +1339,7 @@ int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data) way_up_top: spin_lock(&dlm->spinlock); - res = __dlm_lookup_lockres(dlm, name, namelen, hash); + res = __dlm_lookup_lockres(dlm, name, namelen); if (res) { spin_unlock(&dlm->spinlock); @@ -1531,18 +1459,21 @@ way_up_top: spin_unlock(&dlm->spinlock); mle = (struct dlm_master_list_entry *) - kmem_cache_alloc(dlm_mle_cache, GFP_NOFS); + kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); if (!mle) { response = DLM_MASTER_RESP_ERROR; mlog_errno(-ENOMEM); goto send_response; } + spin_lock(&dlm->spinlock); + dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, + name, namelen); + spin_unlock(&dlm->spinlock); goto way_up_top; } // mlog(0, "this is second time thru, already allocated, " // "add the block.\n"); - dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen); set_bit(request->node_idx, mle->maybe_map); list_add(&mle->list, &dlm->master_list); response = DLM_MASTER_RESP_NO; @@ -1625,8 +1556,6 @@ again: dlm_node_iter_init(nodemap, &iter); while ((to = dlm_node_iter_next(&iter)) >= 0) { int r = 0; - struct dlm_master_list_entry *mle = NULL; - mlog(0, "sending assert master to %d (%.*s)\n", to, namelen, lockname); memset(&assert, 0, sizeof(assert)); @@ -1638,28 +1567,20 @@ again: tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key, &assert, sizeof(assert), to, &r); if (tmpret < 0) { - mlog(0, "assert_master returned %d!\n", tmpret); + mlog(ML_ERROR, "assert_master returned %d!\n", tmpret); if (!dlm_is_host_down(tmpret)) { - mlog(ML_ERROR, "unhandled error=%d!\n", tmpret); + mlog(ML_ERROR, "unhandled error!\n"); BUG(); } /* a node died. finish out the rest of the nodes. */ - mlog(0, "link to %d went down!\n", to); + mlog(ML_ERROR, "link to %d went down!\n", to); /* any nonzero status return will do */ ret = tmpret; } else if (r < 0) { /* ok, something horribly messed. kill thyself. */ mlog(ML_ERROR,"during assert master of %.*s to %u, " "got %d.\n", namelen, lockname, to, r); - spin_lock(&dlm->spinlock); - spin_lock(&dlm->master_lock); - if (dlm_find_mle(dlm, &mle, (char *)lockname, - namelen)) { - dlm_print_one_mle(mle); - __dlm_put_mle(mle); - } - spin_unlock(&dlm->master_lock); - spin_unlock(&dlm->spinlock); + dlm_dump_lock_resources(dlm); BUG(); } else if (r == EAGAIN) { mlog(0, "%.*s: node %u create mles on other " @@ -1691,7 +1612,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf; struct dlm_lock_resource *res = NULL; char *name; - unsigned int namelen, hash; + unsigned int namelen; u32 flags; int master_request = 0; int ret = 0; @@ -1701,7 +1622,6 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) name = assert->name; namelen = assert->namelen; - hash = dlm_lockid_hash(name, namelen); flags = be32_to_cpu(assert->flags); if (namelen > DLM_LOCKID_NAME_MAX) { @@ -1726,7 +1646,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) if (bit >= O2NM_MAX_NODES) { /* not necessarily an error, though less likely. * could be master just re-asserting. */ - mlog(0, "no bits set in the maybe_map, but %u " + mlog(ML_ERROR, "no bits set in the maybe_map, but %u " "is asserting! (%.*s)\n", assert->node_idx, namelen, name); } else if (bit != assert->node_idx) { @@ -1738,36 +1658,19 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) * number winning the mastery will respond * YES to mastery requests, but this node * had no way of knowing. let it pass. */ - mlog(0, "%u is the lowest node, " + mlog(ML_ERROR, "%u is the lowest node, " "%u is asserting. (%.*s) %u must " "have begun after %u won.\n", bit, assert->node_idx, namelen, name, bit, assert->node_idx); } } - if (mle->type == DLM_MLE_MIGRATION) { - if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) { - mlog(0, "%s:%.*s: got cleanup assert" - " from %u for migration\n", - dlm->name, namelen, name, - assert->node_idx); - } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) { - mlog(0, "%s:%.*s: got unrelated assert" - " from %u for migration, ignoring\n", - dlm->name, namelen, name, - assert->node_idx); - __dlm_put_mle(mle); - spin_unlock(&dlm->master_lock); - spin_unlock(&dlm->spinlock); - goto done; - } - } } spin_unlock(&dlm->master_lock); /* ok everything checks out with the MLE * now check to see if there is a lockres */ - res = __dlm_lookup_lockres(dlm, name, namelen, hash); + res = __dlm_lookup_lockres(dlm, name, namelen); if (res) { spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { @@ -1776,8 +1679,7 @@ int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data) goto kill; } if (!mle) { - if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN && - res->owner != assert->node_idx) { + if (res->owner != assert->node_idx) { mlog(ML_ERROR, "assert_master from " "%u, but current owner is " "%u! (%.*s)\n", @@ -1830,7 +1732,6 @@ ok: if (mle) { int extra_ref = 0; int nn = -1; - int rr, err = 0; spin_lock(&mle->spinlock); if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION) @@ -1850,64 +1751,27 @@ ok: wake_up(&mle->wq); spin_unlock(&mle->spinlock); - if (res) { + if (mle->type == DLM_MLE_MIGRATION && res) { + mlog(0, "finishing off migration of lockres %.*s, " + "from %u to %u\n", + res->lockname.len, res->lockname.name, + dlm->node_num, mle->new_master); spin_lock(&res->spinlock); - if (mle->type == DLM_MLE_MIGRATION) { - mlog(0, "finishing off migration of lockres %.*s, " - "from %u to %u\n", - res->lockname.len, res->lockname.name, - dlm->node_num, mle->new_master); - res->state &= ~DLM_LOCK_RES_MIGRATING; - dlm_change_lockres_owner(dlm, res, mle->new_master); - BUG_ON(res->state & DLM_LOCK_RES_DIRTY); - } else { - dlm_change_lockres_owner(dlm, res, mle->master); - } + res->state &= ~DLM_LOCK_RES_MIGRATING; + dlm_change_lockres_owner(dlm, res, mle->new_master); + BUG_ON(res->state & DLM_LOCK_RES_DIRTY); spin_unlock(&res->spinlock); } - - /* master is known, detach if not already detached. - * ensures that only one assert_master call will happen - * on this mle. */ - spin_lock(&dlm->spinlock); - spin_lock(&dlm->master_lock); - - rr = atomic_read(&mle->mle_refs.refcount); - if (mle->inuse > 0) { - if (extra_ref && rr < 3) - err = 1; - else if (!extra_ref && rr < 2) - err = 1; - } else { - if (extra_ref && rr < 2) - err = 1; - else if (!extra_ref && rr < 1) - err = 1; - } - if (err) { - mlog(ML_ERROR, "%s:%.*s: got assert master from %u " - "that will mess up this node, refs=%d, extra=%d, " - "inuse=%d\n", dlm->name, namelen, name, - assert->node_idx, rr, extra_ref, mle->inuse); - dlm_print_one_mle(mle); - } - list_del_init(&mle->list); - __dlm_mle_detach_hb_events(dlm, mle); - __dlm_put_mle(mle); + /* master is known, detach if not already detached */ + dlm_mle_detach_hb_events(dlm, mle); + dlm_put_mle(mle); + if (extra_ref) { /* the assert master message now balances the extra * ref given by the master / migration request message. * if this is the last put, it will be removed * from the list. */ - __dlm_put_mle(mle); - } - spin_unlock(&dlm->master_lock); - spin_unlock(&dlm->spinlock); - } else if (res) { - if (res->owner != assert->node_idx) { - mlog(0, "assert_master from %u, but current " - "owner is %u (%.*s), no mle\n", assert->node_idx, - res->owner, namelen, name); + dlm_put_mle(mle); } } @@ -1924,12 +1788,12 @@ done: kill: /* kill the caller! */ - mlog(ML_ERROR, "Bad message received from another node. Dumping state " - "and killing the other node now! This node is OK and can continue.\n"); - __dlm_print_one_lock_resource(res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); dlm_lockres_put(res); + mlog(ML_ERROR, "Bad message received from another node. Dumping state " + "and killing the other node now! This node is OK and can continue.\n"); + dlm_dump_lock_resources(dlm); dlm_put(dlm); return -EINVAL; } @@ -1939,7 +1803,7 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, int ignore_higher, u8 request_from, u32 flags) { struct dlm_work_item *item; - item = kcalloc(1, sizeof(*item), GFP_NOFS); + item = kcalloc(1, sizeof(*item), GFP_KERNEL); if (!item) return -ENOMEM; @@ -1961,7 +1825,7 @@ int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); - queue_work(dlm->dlm_worker, &dlm->dispatched_work); + schedule_work(&dlm->dispatched_work); return 0; } @@ -2002,23 +1866,6 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) } } - /* - * If we're migrating this lock to someone else, we are no - * longer allowed to assert out own mastery. OTOH, we need to - * prevent migration from starting while we're still asserting - * our dominance. The reserved ast delays migration. - */ - spin_lock(&res->spinlock); - if (res->state & DLM_LOCK_RES_MIGRATING) { - mlog(0, "Someone asked us to assert mastery, but we're " - "in the middle of migration. Skipping assert, " - "the new master will handle that.\n"); - spin_unlock(&res->spinlock); - goto put; - } else - __dlm_lockres_reserve_ast(res); - spin_unlock(&res->spinlock); - /* this call now finishes out the nodemap * even if one or more nodes die */ mlog(0, "worker about to master %.*s here, this=%u\n", @@ -2028,14 +1875,9 @@ static void dlm_assert_master_worker(struct dlm_work_item *item, void *data) nodemap, flags); if (ret < 0) { /* no need to restart, we are done */ - if (!dlm_is_host_down(ret)) - mlog_errno(ret); + mlog_errno(ret); } - /* Ok, we've asserted ourselves. Let's let migration start. */ - dlm_lockres_release_ast(dlm, res); - -put: dlm_lockres_put(res); mlog(0, "finished with dlm_assert_master_worker\n"); @@ -2074,7 +1916,6 @@ static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, BUG(); /* host is down, so answer for that node would be * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */ - ret = 0; } if (master != DLM_LOCK_RES_OWNER_UNKNOWN) { @@ -2175,14 +2016,14 @@ int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, */ ret = -ENOMEM; - mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS); + mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL); if (!mres) { mlog_errno(ret); goto leave; } mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, - GFP_NOFS); + GFP_KERNEL); if (!mle) { mlog_errno(ret); goto leave; @@ -2276,7 +2117,7 @@ fail: * take both dlm->spinlock and dlm->master_lock */ spin_lock(&dlm->spinlock); spin_lock(&dlm->master_lock); - dlm_get_mle_inuse(mle); + dlm_get_mle(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); @@ -2293,10 +2134,7 @@ fail: /* migration failed, detach and clean up mle */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); - dlm_put_mle_inuse(mle); - spin_lock(&res->spinlock); - res->state &= ~DLM_LOCK_RES_MIGRATING; - spin_unlock(&res->spinlock); + dlm_put_mle(mle); goto leave; } @@ -2326,8 +2164,8 @@ fail: /* avoid hang during shutdown when migrating lockres * to a node which also goes down */ if (dlm_is_node_dead(dlm, target)) { - mlog(0, "%s:%.*s: expected migration " - "target %u is no longer up, restarting\n", + mlog(0, "%s:%.*s: expected migration target %u " + "is no longer up. restarting.\n", dlm->name, res->lockname.len, res->lockname.name, target); ret = -ERESTARTSYS; @@ -2337,10 +2175,7 @@ fail: /* migration failed, detach and clean up mle */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); - dlm_put_mle_inuse(mle); - spin_lock(&res->spinlock); - res->state &= ~DLM_LOCK_RES_MIGRATING; - spin_unlock(&res->spinlock); + dlm_put_mle(mle); goto leave; } /* TODO: if node died: stop, clean up, return error */ @@ -2356,7 +2191,7 @@ fail: /* master is known, detach if not already detached */ dlm_mle_detach_hb_events(dlm, mle); - dlm_put_mle_inuse(mle); + dlm_put_mle(mle); ret = 0; dlm_lockres_calc_usage(dlm, res); @@ -2375,6 +2210,7 @@ leave: mlog(0, "returning %d\n", ret); return ret; } +EXPORT_SYMBOL_GPL(dlm_migrate_lockres); int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock) { @@ -2626,7 +2462,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf; struct dlm_master_list_entry *mle = NULL, *oldmle = NULL; const char *name; - unsigned int namelen, hash; + unsigned int namelen; int ret = 0; if (!dlm_grab(dlm)) @@ -2634,11 +2470,10 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) name = migrate->name; namelen = migrate->namelen; - hash = dlm_lockid_hash(name, namelen); /* preallocate.. if this fails, abort */ mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, - GFP_NOFS); + GFP_KERNEL); if (!mle) { ret = -ENOMEM; @@ -2647,7 +2482,7 @@ int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data) /* check for pre-existing lock */ spin_lock(&dlm->spinlock); - res = __dlm_lookup_lockres(dlm, name, namelen, hash); + res = __dlm_lookup_lockres(dlm, name, namelen); spin_lock(&dlm->master_lock); if (res) { @@ -2745,7 +2580,6 @@ static int dlm_add_migration_mle(struct dlm_ctxt *dlm, /* remove it from the list so that only one * mle will be found */ list_del_init(&tmp->list); - __dlm_mle_detach_hb_events(dlm, mle); } spin_unlock(&tmp->spinlock); } @@ -2767,7 +2601,6 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node) struct list_head *iter, *iter2; struct dlm_master_list_entry *mle; struct dlm_lock_resource *res; - unsigned int hash; mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node); top: @@ -2807,7 +2640,7 @@ top: * may result in the mle being unlinked and * freed, but there may still be a process * waiting in the dlmlock path which is fine. */ - mlog(0, "node %u was expected master\n", + mlog(ML_ERROR, "node %u was expected master\n", dead_node); atomic_set(&mle->woken, 1); spin_unlock(&mle->spinlock); @@ -2840,21 +2673,19 @@ top: /* remove from the list early. NOTE: unlinking * list_head while in list_for_each_safe */ - __dlm_mle_detach_hb_events(dlm, mle); spin_lock(&mle->spinlock); list_del_init(&mle->list); atomic_set(&mle->woken, 1); spin_unlock(&mle->spinlock); wake_up(&mle->wq); - mlog(0, "%s: node %u died during migration from " - "%u to %u!\n", dlm->name, dead_node, + mlog(0, "node %u died during migration from " + "%u to %u!\n", dead_node, mle->master, mle->new_master); /* if there is a lockres associated with this * mle, find it and set its owner to UNKNOWN */ - hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len); res = __dlm_lookup_lockres(dlm, mle->u.name.name, - mle->u.name.len, hash); + mle->u.name.len); if (res) { /* unfortunately if we hit this rare case, our * lock ordering is messed. we need to drop