X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=fs%2Focfs2%2Fdlm%2Fdlmrecovery.c;h=805cbabac051d5007c492ac5de04f4f81c84e832;hb=9464c7cf61b9433057924c36e6e02f303a00e768;hp=594745fab0b5aa0ebcbccfc16d9090e65af4ac80;hpb=41689045f6a3cbe0550e1d34e9cc20d2e8c432ba;p=linux-2.6.git diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 594745fab..805cbabac 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -95,14 +95,11 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st); static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data); static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data); -static int dlm_lockres_master_requery(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - u8 *real_master); static u64 dlm_get_next_mig_cookie(void); -static DEFINE_SPINLOCK(dlm_reco_state_lock); -static DEFINE_SPINLOCK(dlm_mig_cookie_lock); +static spinlock_t dlm_reco_state_lock = SPIN_LOCK_UNLOCKED; +static spinlock_t dlm_mig_cookie_lock = SPIN_LOCK_UNLOCKED; static u64 dlm_mig_cookie = 1; static u64 dlm_get_next_mig_cookie(void) @@ -118,37 +115,12 @@ static u64 dlm_get_next_mig_cookie(void) return c; } -static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm, - u8 dead_node) -{ - assert_spin_locked(&dlm->spinlock); - if (dlm->reco.dead_node != dead_node) - mlog(0, "%s: changing dead_node from %u to %u\n", - dlm->name, dlm->reco.dead_node, dead_node); - dlm->reco.dead_node = dead_node; -} - -static inline void dlm_set_reco_master(struct dlm_ctxt *dlm, - u8 master) -{ - assert_spin_locked(&dlm->spinlock); - mlog(0, "%s: changing new_master from %u to %u\n", - dlm->name, dlm->reco.new_master, master); - dlm->reco.new_master = master; -} - -static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm) -{ - assert_spin_locked(&dlm->spinlock); - clear_bit(dlm->reco.dead_node, dlm->recovery_map); - dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); - dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); -} - static inline void dlm_reset_recovery(struct dlm_ctxt *dlm) { spin_lock(&dlm->spinlock); - __dlm_reset_recovery(dlm); + clear_bit(dlm->reco.dead_node, dlm->recovery_map); + dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; + dlm->reco.new_master = O2NM_INVALID_NODE_NUM; spin_unlock(&dlm->spinlock); } @@ -160,20 +132,11 @@ void dlm_dispatch_work(void *data) struct list_head *iter, *iter2; struct dlm_work_item *item; dlm_workfunc_t *workfunc; - int tot=0; - - if (!dlm_joined(dlm)) - return; spin_lock(&dlm->work_lock); list_splice_init(&dlm->work_list, &tmp_list); spin_unlock(&dlm->work_lock); - list_for_each_safe(iter, iter2, &tmp_list) { - tot++; - } - mlog(0, "%s: work thread has %d work items\n", dlm->name, tot); - list_for_each_safe(iter, iter2, &tmp_list) { item = list_entry(iter, struct dlm_work_item, list); workfunc = item->func; @@ -257,52 +220,6 @@ void dlm_complete_recovery_thread(struct dlm_ctxt *dlm) * */ -static void dlm_print_reco_node_status(struct dlm_ctxt *dlm) -{ - struct dlm_reco_node_data *ndata; - struct dlm_lock_resource *res; - - mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, dead=%u, master=%u\n", - dlm->name, dlm->dlm_reco_thread_task->pid, - dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive", - dlm->reco.dead_node, dlm->reco.new_master); - - list_for_each_entry(ndata, &dlm->reco.node_data, list) { - char *st = "unknown"; - switch (ndata->state) { - case DLM_RECO_NODE_DATA_INIT: - st = "init"; - break; - case DLM_RECO_NODE_DATA_REQUESTING: - st = "requesting"; - break; - case DLM_RECO_NODE_DATA_DEAD: - st = "dead"; - break; - case DLM_RECO_NODE_DATA_RECEIVING: - st = "receiving"; - break; - case DLM_RECO_NODE_DATA_REQUESTED: - st = "requested"; - break; - case DLM_RECO_NODE_DATA_DONE: - st = "done"; - break; - case DLM_RECO_NODE_DATA_FINALIZE_SENT: - st = "finalize-sent"; - break; - default: - st = "bad"; - break; - } - mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n", - dlm->name, ndata->node_num, st); - } - list_for_each_entry(res, &dlm->reco.resources, recovering) { - mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n", - dlm->name, res->lockname.len, res->lockname.name); - } -} #define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000) @@ -350,23 +267,11 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node) { int dead; spin_lock(&dlm->spinlock); - dead = !test_bit(node, dlm->domain_map); + dead = test_bit(node, dlm->domain_map); spin_unlock(&dlm->spinlock); return dead; } -/* returns true if node is no longer in the domain - * could be dead or just not joined */ -static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node) -{ - int recovered; - spin_lock(&dlm->spinlock); - recovered = !test_bit(node, dlm->recovery_map); - spin_unlock(&dlm->spinlock); - return recovered; -} - - int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) { if (timeout) { @@ -385,24 +290,6 @@ int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) return 0; } -int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) -{ - if (timeout) { - mlog(0, "%s: waiting %dms for notification of " - "recovery of node %u\n", dlm->name, timeout, node); - wait_event_timeout(dlm->dlm_reco_thread_wq, - dlm_is_node_recovered(dlm, node), - msecs_to_jiffies(timeout)); - } else { - mlog(0, "%s: waiting indefinitely for notification " - "of recovery of node %u\n", dlm->name, node); - wait_event(dlm->dlm_reco_thread_wq, - dlm_is_node_recovered(dlm, node)); - } - /* for now, return 0 */ - return 0; -} - /* callers of the top-level api calls (dlmlock/dlmunlock) should * block on the dlm->reco.event when recovery is in progress. * the dlm recovery thread will set this state when it begins @@ -421,13 +308,6 @@ static int dlm_in_recovery(struct dlm_ctxt *dlm) void dlm_wait_for_recovery(struct dlm_ctxt *dlm) { - if (dlm_in_recovery(dlm)) { - mlog(0, "%s: reco thread %d in recovery: " - "state=%d, master=%u, dead=%u\n", - dlm->name, dlm->dlm_reco_thread_task->pid, - dlm->reco.state, dlm->reco.new_master, - dlm->reco.dead_node); - } wait_event(dlm->reco.event, !dlm_in_recovery(dlm)); } @@ -461,7 +341,7 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) mlog(0, "new master %u died while recovering %u!\n", dlm->reco.new_master, dlm->reco.dead_node); /* unset the new_master, leave dead_node */ - dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); + dlm->reco.new_master = O2NM_INVALID_NODE_NUM; } /* select a target to recover */ @@ -470,14 +350,14 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0); if (bit >= O2NM_MAX_NODES || bit < 0) - dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); + dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; else - dlm_set_reco_dead_node(dlm, bit); + dlm->reco.dead_node = bit; } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) { /* BUG? */ mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n", dlm->reco.dead_node); - dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); + dlm->reco.dead_node = O2NM_INVALID_NODE_NUM; } if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { @@ -486,8 +366,7 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) /* return to main thread loop and sleep. */ return 0; } - mlog(0, "%s(%d):recovery thread found node %u in the recovery map!\n", - dlm->name, dlm->dlm_reco_thread_task->pid, + mlog(0, "recovery thread found node %u in the recovery map!\n", dlm->reco.dead_node); spin_unlock(&dlm->spinlock); @@ -510,8 +389,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) } mlog(0, "another node will master this recovery session.\n"); } - mlog(0, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n", - dlm->name, dlm->dlm_reco_thread_task->pid, dlm->reco.new_master, + mlog(0, "dlm=%s, new_master=%u, this node=%u, dead_node=%u\n", + dlm->name, dlm->reco.new_master, dlm->node_num, dlm->reco.dead_node); /* it is safe to start everything back up here @@ -523,13 +402,11 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm) return 0; master_here: - mlog(0, "(%d) mastering recovery of %s:%u here(this=%u)!\n", - dlm->dlm_reco_thread_task->pid, + mlog(0, "mastering recovery of %s:%u here(this=%u)!\n", dlm->name, dlm->reco.dead_node, dlm->node_num); status = dlm_remaster_locks(dlm, dlm->reco.dead_node); if (status < 0) { - /* we should never hit this anymore */ mlog(ML_ERROR, "error %d remastering locks for node %u, " "retrying.\n", status, dlm->reco.dead_node); /* yield a bit to allow any final network messages @@ -556,16 +433,9 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) int destroy = 0; int pass = 0; - do { - /* we have become recovery master. there is no escaping - * this, so just keep trying until we get it. */ - status = dlm_init_recovery_area(dlm, dead_node); - if (status < 0) { - mlog(ML_ERROR, "%s: failed to alloc recovery area, " - "retrying\n", dlm->name); - msleep(1000); - } - } while (status != 0); + status = dlm_init_recovery_area(dlm, dead_node); + if (status < 0) + goto leave; /* safe to access the node data list without a lock, since this * process is the only one to change the list */ @@ -582,36 +452,16 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) continue; } - do { - status = dlm_request_all_locks(dlm, ndata->node_num, - dead_node); - if (status < 0) { - mlog_errno(status); - if (dlm_is_host_down(status)) { - /* node died, ignore it for recovery */ - status = 0; - ndata->state = DLM_RECO_NODE_DATA_DEAD; - /* wait for the domain map to catch up - * with the network state. */ - wait_event_timeout(dlm->dlm_reco_thread_wq, - dlm_is_node_dead(dlm, - ndata->node_num), - msecs_to_jiffies(1000)); - mlog(0, "waited 1 sec for %u, " - "dead? %s\n", ndata->node_num, - dlm_is_node_dead(dlm, ndata->node_num) ? - "yes" : "no"); - } else { - /* -ENOMEM on the other node */ - mlog(0, "%s: node %u returned " - "%d during recovery, retrying " - "after a short wait\n", - dlm->name, ndata->node_num, - status); - msleep(100); - } + status = dlm_request_all_locks(dlm, ndata->node_num, dead_node); + if (status < 0) { + mlog_errno(status); + if (dlm_is_host_down(status)) + ndata->state = DLM_RECO_NODE_DATA_DEAD; + else { + destroy = 1; + goto leave; } - } while (status != 0); + } switch (ndata->state) { case DLM_RECO_NODE_DATA_INIT: @@ -623,9 +473,10 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) mlog(0, "node %u died after requesting " "recovery info for node %u\n", ndata->node_num, dead_node); - /* fine. don't need this node's info. - * continue without it. */ - break; + // start all over + destroy = 1; + status = -EAGAIN; + goto leave; case DLM_RECO_NODE_DATA_REQUESTING: ndata->state = DLM_RECO_NODE_DATA_REQUESTED; mlog(0, "now receiving recovery data from " @@ -669,26 +520,35 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) BUG(); break; case DLM_RECO_NODE_DATA_DEAD: - mlog(0, "node %u died after " + mlog(ML_NOTICE, "node %u died after " "requesting recovery info for " "node %u\n", ndata->node_num, dead_node); - break; + spin_unlock(&dlm_reco_state_lock); + // start all over + destroy = 1; + status = -EAGAIN; + /* instead of spinning like crazy here, + * wait for the domain map to catch up + * with the network state. otherwise this + * can be hit hundreds of times before + * the node is really seen as dead. */ + wait_event_timeout(dlm->dlm_reco_thread_wq, + dlm_is_node_dead(dlm, + ndata->node_num), + msecs_to_jiffies(1000)); + mlog(0, "waited 1 sec for %u, " + "dead? %s\n", ndata->node_num, + dlm_is_node_dead(dlm, ndata->node_num) ? + "yes" : "no"); + goto leave; case DLM_RECO_NODE_DATA_RECEIVING: case DLM_RECO_NODE_DATA_REQUESTED: - mlog(0, "%s: node %u still in state %s\n", - dlm->name, ndata->node_num, - ndata->state==DLM_RECO_NODE_DATA_RECEIVING ? - "receiving" : "requested"); all_nodes_done = 0; break; case DLM_RECO_NODE_DATA_DONE: - mlog(0, "%s: node %u state is done\n", - dlm->name, ndata->node_num); break; case DLM_RECO_NODE_DATA_FINALIZE_SENT: - mlog(0, "%s: node %u state is finalize\n", - dlm->name, ndata->node_num); break; } } @@ -718,7 +578,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) jiffies, dlm->reco.dead_node, dlm->node_num, dlm->reco.new_master); destroy = 1; - status = 0; + status = ret; /* rescan everything marked dirty along the way */ dlm_kick_thread(dlm, NULL); break; @@ -731,6 +591,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) } +leave: if (destroy) dlm_destroy_recovery_area(dlm, dead_node); @@ -756,7 +617,7 @@ static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node) } BUG_ON(num == dead_node); - ndata = kcalloc(1, sizeof(*ndata), GFP_NOFS); + ndata = kcalloc(1, sizeof(*ndata), GFP_KERNEL); if (!ndata) { dlm_destroy_recovery_area(dlm, dead_node); return -ENOMEM; @@ -830,25 +691,16 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data) if (!dlm_grab(dlm)) return -EINVAL; - if (lr->dead_node != dlm->reco.dead_node) { - mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local " - "dead_node is %u\n", dlm->name, lr->node_idx, - lr->dead_node, dlm->reco.dead_node); - dlm_print_reco_node_status(dlm); - /* this is a hack */ - dlm_put(dlm); - return -ENOMEM; - } BUG_ON(lr->dead_node != dlm->reco.dead_node); - item = kcalloc(1, sizeof(*item), GFP_NOFS); + item = kcalloc(1, sizeof(*item), GFP_KERNEL); if (!item) { dlm_put(dlm); return -ENOMEM; } /* this will get freed by dlm_request_all_locks_worker */ - buf = (char *) __get_free_page(GFP_NOFS); + buf = (char *) __get_free_page(GFP_KERNEL); if (!buf) { kfree(item); dlm_put(dlm); @@ -863,7 +715,7 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data) spin_lock(&dlm->work_lock); list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); - queue_work(dlm->dlm_worker, &dlm->dispatched_work); + schedule_work(&dlm->dispatched_work); dlm_put(dlm); return 0; @@ -878,34 +730,32 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) struct list_head *iter; int ret; u8 dead_node, reco_master; - int skip_all_done = 0; dlm = item->dlm; dead_node = item->u.ral.dead_node; reco_master = item->u.ral.reco_master; mres = (struct dlm_migratable_lockres *)data; - mlog(0, "%s: recovery worker started, dead=%u, master=%u\n", - dlm->name, dead_node, reco_master); - if (dead_node != dlm->reco.dead_node || reco_master != dlm->reco.new_master) { - /* worker could have been created before the recovery master - * died. if so, do not continue, but do not error. */ - if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { - mlog(ML_NOTICE, "%s: will not send recovery state, " - "recovery master %u died, thread=(dead=%u,mas=%u)" - " current=(dead=%u,mas=%u)\n", dlm->name, - reco_master, dead_node, reco_master, - dlm->reco.dead_node, dlm->reco.new_master); - } else { - mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, " - "master=%u), request(dead=%u, master=%u)\n", - dlm->name, dlm->reco.dead_node, - dlm->reco.new_master, dead_node, reco_master); - } - goto leave; + /* show extra debug info if the recovery state is messed */ + mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), " + "request(dead=%u, master=%u)\n", + dlm->name, dlm->reco.dead_node, dlm->reco.new_master, + dead_node, reco_master); + mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u " + "entry[0]={c=%u:%llu,l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n", + dlm->name, mres->lockname_len, mres->lockname, mres->master, + mres->num_locks, mres->total_locks, mres->flags, + dlm_get_lock_cookie_node(mres->ml[0].cookie), + dlm_get_lock_cookie_seq(mres->ml[0].cookie), + mres->ml[0].list, mres->ml[0].flags, + mres->ml[0].type, mres->ml[0].convert_type, + mres->ml[0].highest_blocked, mres->ml[0].node); + BUG(); } + BUG_ON(dead_node != dlm->reco.dead_node); + BUG_ON(reco_master != dlm->reco.new_master); /* lock resources should have already been moved to the * dlm->reco.resources list. now move items from that list @@ -916,20 +766,12 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) dlm_move_reco_locks_to_list(dlm, &resources, dead_node); /* now we can begin blasting lockreses without the dlm lock */ - - /* any errors returned will be due to the new_master dying, - * the dlm_reco_thread should detect this */ list_for_each(iter, &resources) { res = list_entry (iter, struct dlm_lock_resource, recovering); ret = dlm_send_one_lockres(dlm, res, mres, reco_master, DLM_MRES_RECOVERY); - if (ret < 0) { - mlog(ML_ERROR, "%s: node %u went down while sending " - "recovery state for dead node %u, ret=%d\n", dlm->name, - reco_master, dead_node, ret); - skip_all_done = 1; - break; - } + if (ret < 0) + mlog_errno(ret); } /* move the resources back to the list */ @@ -937,15 +779,10 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data) list_splice_init(&resources, &dlm->reco.resources); spin_unlock(&dlm->spinlock); - if (!skip_all_done) { - ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); - if (ret < 0) { - mlog(ML_ERROR, "%s: node %u went down while sending " - "recovery all-done for dead node %u, ret=%d\n", - dlm->name, reco_master, dead_node, ret); - } - } -leave: + ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); + if (ret < 0) + mlog_errno(ret); + free_page((unsigned long)data); } @@ -964,14 +801,8 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to) ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, sizeof(done_msg), send_to, &tmpret); - if (ret < 0) { - if (!dlm_is_host_down(ret)) { - mlog_errno(ret); - mlog(ML_ERROR, "%s: unknown error sending data-done " - "to %u\n", dlm->name, send_to); - BUG(); - } - } else + /* negative status is ignored by the caller */ + if (ret >= 0) ret = tmpret; return ret; } @@ -991,11 +822,7 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data) mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, " "node_idx=%u, this node=%u\n", done->dead_node, dlm->reco.dead_node, done->node_idx, dlm->node_num); - - mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node), - "Got DATA DONE: dead_node=%u, reco.dead_node=%u, " - "node_idx=%u, this node=%u\n", done->dead_node, - dlm->reco.dead_node, done->node_idx, dlm->node_num); + BUG_ON(done->dead_node != dlm->reco.dead_node); spin_lock(&dlm_reco_state_lock); list_for_each(iter, &dlm->reco.node_data) { @@ -1078,11 +905,13 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm, mlog(0, "found lockres owned by dead node while " "doing recovery for node %u. sending it.\n", dead_node); - list_move_tail(&res->recovering, list); + list_del_init(&res->recovering); + list_add_tail(&res->recovering, list); } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { mlog(0, "found UNKNOWN owner while doing recovery " "for node %u. sending it.\n", dead_node); - list_move_tail(&res->recovering, list); + list_del_init(&res->recovering); + list_add_tail(&res->recovering, list); } } spin_unlock(&dlm->spinlock); @@ -1194,9 +1023,8 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock, ml->type == LKM_PRMODE) { /* if it is already set, this had better be a PR * and it has to match */ - if (!dlm_lvb_is_empty(mres->lvb) && - (ml->type == LKM_EXMODE || - memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) { + if (mres->lvb[0] && (ml->type == LKM_EXMODE || + memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) { mlog(ML_ERROR, "mismatched lvbs!\n"); __dlm_print_one_lock_resource(lock->lockres); BUG(); @@ -1255,25 +1083,22 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, * we must send it immediately. */ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); - if (ret < 0) - goto error; + if (ret < 0) { + // TODO + mlog(ML_ERROR, "dlm_send_mig_lockres_msg " + "returned %d, TODO\n", ret); + BUG(); + } } } /* flush any remaining locks */ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); - if (ret < 0) - goto error; - return ret; - -error: - mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n", - dlm->name, ret); - if (!dlm_is_host_down(ret)) + if (ret < 0) { + // TODO + mlog(ML_ERROR, "dlm_send_mig_lockres_msg returned %d, " + "TODO\n", ret); BUG(); - mlog(0, "%s: node %u went down while sending %s " - "lockres %.*s\n", dlm->name, send_to, - flags & DLM_MRES_RECOVERY ? "recovery" : "migration", - res->lockname.len, res->lockname.name); + } return ret; } @@ -1321,8 +1146,8 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data) mlog(0, "all done flag. all lockres data received!\n"); ret = -ENOMEM; - buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS); - item = kcalloc(1, sizeof(*item), GFP_NOFS); + buf = kmalloc(be16_to_cpu(msg->data_len), GFP_KERNEL); + item = kcalloc(1, sizeof(*item), GFP_KERNEL); if (!buf || !item) goto leave; @@ -1413,7 +1238,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data) spin_lock(&dlm->work_lock); list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); - queue_work(dlm->dlm_worker, &dlm->dispatched_work); + schedule_work(&dlm->dispatched_work); leave: dlm_put(dlm); @@ -1487,9 +1312,8 @@ leave: -static int dlm_lockres_master_requery(struct dlm_ctxt *dlm, - struct dlm_lock_resource *res, - u8 *real_master) +int dlm_lockres_master_requery(struct dlm_ctxt *dlm, + struct dlm_lock_resource *res, u8 *real_master) { struct dlm_node_iter iter; int nodenum; @@ -1582,7 +1406,6 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data) struct dlm_ctxt *dlm = data; struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf; struct dlm_lock_resource *res = NULL; - unsigned int hash; int master = DLM_LOCK_RES_OWNER_UNKNOWN; u32 flags = DLM_ASSERT_MASTER_REQUERY; @@ -1592,10 +1415,8 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data) return master; } - hash = dlm_lockid_hash(req->name, req->namelen); - spin_lock(&dlm->spinlock); - res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash); + res = __dlm_lookup_lockres(dlm, req->name, req->namelen); if (res) { spin_lock(&res->spinlock); master = res->owner; @@ -1662,7 +1483,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, struct dlm_lock *newlock = NULL; struct dlm_lockstatus *lksb = NULL; int ret = 0; - int i, bad; + int i; struct list_head *iter; struct dlm_lock *lock = NULL; @@ -1708,7 +1529,8 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, /* move the lock to its proper place */ /* do not alter lock refcount. switching lists. */ - list_move_tail(&lock->list, queue); + list_del_init(&lock->list); + list_add_tail(&lock->list, queue); spin_unlock(&res->spinlock); mlog(0, "just reordered a local lock!\n"); @@ -1731,48 +1553,28 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm, } lksb->flags |= (ml->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)); - - if (ml->type == LKM_NLMODE) - goto skip_lvb; - - if (!dlm_lvb_is_empty(mres->lvb)) { + + if (mres->lvb[0]) { if (lksb->flags & DLM_LKSB_PUT_LVB) { /* other node was trying to update * lvb when node died. recreate the * lksb with the updated lvb. */ memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN); - /* the lock resource lvb update must happen - * NOW, before the spinlock is dropped. - * we no longer wait for the AST to update - * the lvb. */ - memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); } else { /* otherwise, the node is sending its * most recent valid lvb info */ BUG_ON(ml->type != LKM_EXMODE && ml->type != LKM_PRMODE); - if (!dlm_lvb_is_empty(res->lvb) && - (ml->type == LKM_EXMODE || - memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { - int i; - mlog(ML_ERROR, "%s:%.*s: received bad " - "lvb! type=%d\n", dlm->name, - res->lockname.len, - res->lockname.name, ml->type); - printk("lockres lvb=["); - for (i=0; ilvb[i]); - printk("]\nmigrated lvb=["); - for (i=0; ilvb[i]); - printk("]\n"); - dlm_print_one_lock_resource(res); - BUG(); + if (res->lvb[0] && (ml->type == LKM_EXMODE || + memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { + mlog(ML_ERROR, "received bad lvb!\n"); + __dlm_print_one_lock_resource(res); + BUG(); } memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); } } -skip_lvb: + /* NOTE: * wrt lock queue ordering and recovery: @@ -1790,33 +1592,9 @@ skip_lvb: * relative to each other, but clearly *not* * preserved relative to locks from other nodes. */ - bad = 0; spin_lock(&res->spinlock); - list_for_each_entry(lock, queue, list) { - if (lock->ml.cookie == ml->cookie) { - u64 c = lock->ml.cookie; - mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already " - "exists on this lockres!\n", dlm->name, - res->lockname.len, res->lockname.name, - dlm_get_lock_cookie_node(c), - dlm_get_lock_cookie_seq(c)); - - mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, " - "node=%u, cookie=%u:%llu, queue=%d\n", - ml->type, ml->convert_type, ml->node, - dlm_get_lock_cookie_node(ml->cookie), - dlm_get_lock_cookie_seq(ml->cookie), - ml->list); - - __dlm_print_one_lock_resource(res); - bad = 1; - break; - } - } - if (!bad) { - dlm_lock_get(newlock); - list_add_tail(&newlock->list, queue); - } + dlm_lock_get(newlock); + list_add_tail(&newlock->list, queue); spin_unlock(&res->spinlock); } mlog(0, "done running all the locks\n"); @@ -1840,14 +1618,8 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm, struct dlm_lock *lock; res->state |= DLM_LOCK_RES_RECOVERING; - if (!list_empty(&res->recovering)) { - mlog(0, - "Recovering res %s:%.*s, is already on recovery list!\n", - dlm->name, res->lockname.len, res->lockname.name); + if (!list_empty(&res->recovering)) list_del_init(&res->recovering); - } - /* We need to hold a reference while on the recovery list */ - dlm_lockres_get(res); list_add_tail(&res->recovering, &dlm->reco.resources); /* find any pending locks and put them back on proper list */ @@ -1936,11 +1708,9 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, new_master); res->state &= ~DLM_LOCK_RES_RECOVERING; - if (!__dlm_lockres_unused(res)) - __dlm_dirty_lockres(dlm, res); + __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); - dlm_lockres_put(res); } } @@ -1949,7 +1719,7 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, * the RECOVERING state and set the owner * if necessary */ for (i = 0; i < DLM_HASH_BUCKETS; i++) { - bucket = dlm_lockres_hash(dlm, i); + bucket = &(dlm->lockres_hash[i]); hlist_for_each_entry(res, hash_iter, bucket, hash_node) { if (res->state & DLM_LOCK_RES_RECOVERING) { if (res->owner == dead_node) { @@ -1973,13 +1743,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, dlm->name, res->lockname.len, res->lockname.name, res->owner); list_del_init(&res->recovering); - dlm_lockres_put(res); } spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, new_master); res->state &= ~DLM_LOCK_RES_RECOVERING; - if (!__dlm_lockres_unused(res)) - __dlm_dirty_lockres(dlm, res); + __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); } @@ -2116,7 +1884,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) * need to be fired as a result. */ for (i = 0; i < DLM_HASH_BUCKETS; i++) { - bucket = dlm_lockres_hash(dlm, i); + bucket = &(dlm->lockres_hash[i]); hlist_for_each_entry(res, iter, bucket, hash_node) { /* always prune any $RECOVERY entries for dead nodes, * otherwise hangs can occur during later recovery */ @@ -2156,20 +1924,6 @@ static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx) { assert_spin_locked(&dlm->spinlock); - if (dlm->reco.new_master == idx) { - mlog(0, "%s: recovery master %d just died\n", - dlm->name, idx); - if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { - /* finalize1 was reached, so it is safe to clear - * the new_master and dead_node. that recovery - * is complete. */ - mlog(0, "%s: dead master %d had reached " - "finalize1 state, clearing\n", dlm->name, idx); - dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; - __dlm_reset_recovery(dlm); - } - } - /* check to see if the node is already considered dead */ if (!test_bit(idx, dlm->live_nodes_map)) { mlog(0, "for domain %s, node %d is already dead. " @@ -2333,7 +2087,7 @@ again: /* set the new_master to this node */ spin_lock(&dlm->spinlock); - dlm_set_reco_master(dlm, dlm->node_num); + dlm->reco.new_master = dlm->node_num; spin_unlock(&dlm->spinlock); } @@ -2371,10 +2125,6 @@ again: mlog(0, "%s: reco master %u is ready to recover %u\n", dlm->name, dlm->reco.new_master, dlm->reco.dead_node); status = -EEXIST; - } else if (ret == DLM_RECOVERING) { - mlog(0, "dlm=%s dlmlock says master node died (this=%u)\n", - dlm->name, dlm->node_num); - goto again; } else { struct dlm_lock_resource *res; @@ -2406,7 +2156,7 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) mlog_entry("%u\n", dead_node); - mlog(0, "%s: dead node is %u\n", dlm->name, dead_node); + mlog(0, "dead node is %u\n", dead_node); spin_lock(&dlm->spinlock); dlm_node_iter_init(dlm->domain_map, &iter); @@ -2464,14 +2214,6 @@ retry: * another ENOMEM */ msleep(100); goto retry; - } else if (ret == EAGAIN) { - mlog(0, "%s: trying to start recovery of node " - "%u, but node %u is waiting for last recovery " - "to complete, backoff for a bit\n", dlm->name, - dead_node, nodenum); - /* TODO Look into replacing msleep with cond_resched() */ - msleep(100); - goto retry; } } @@ -2487,20 +2229,8 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) if (!dlm_grab(dlm)) return 0; - spin_lock(&dlm->spinlock); - if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { - mlog(0, "%s: node %u wants to recover node %u (%u:%u) " - "but this node is in finalize state, waiting on finalize2\n", - dlm->name, br->node_idx, br->dead_node, - dlm->reco.dead_node, dlm->reco.new_master); - spin_unlock(&dlm->spinlock); - return EAGAIN; - } - spin_unlock(&dlm->spinlock); - - mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n", - dlm->name, br->node_idx, br->dead_node, - dlm->reco.dead_node, dlm->reco.new_master); + mlog(0, "node %u wants to recover node %u\n", + br->node_idx, br->dead_node); dlm_fire_domain_eviction_callbacks(dlm, br->dead_node); @@ -2522,8 +2252,8 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) "node %u changing it to %u\n", dlm->name, dlm->reco.dead_node, br->node_idx, br->dead_node); } - dlm_set_reco_master(dlm, br->node_idx); - dlm_set_reco_dead_node(dlm, br->dead_node); + dlm->reco.new_master = br->node_idx; + dlm->reco.dead_node = br->dead_node; if (!test_bit(br->dead_node, dlm->recovery_map)) { mlog(0, "recovery master %u sees %u as dead, but this " "node has not yet. marking %u as dead\n", @@ -2542,16 +2272,10 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data) spin_unlock(&dlm->spinlock); dlm_kick_recovery_thread(dlm); - - mlog(0, "%s: recovery started by node %u, for %u (%u:%u)\n", - dlm->name, br->node_idx, br->dead_node, - dlm->reco.dead_node, dlm->reco.new_master); - dlm_put(dlm); return 0; } -#define DLM_FINALIZE_STAGE2 0x01 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) { int ret = 0; @@ -2559,31 +2283,25 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) struct dlm_node_iter iter; int nodenum; int status; - int stage = 1; - mlog(0, "finishing recovery for node %s:%u, " - "stage %d\n", dlm->name, dlm->reco.dead_node, stage); + mlog(0, "finishing recovery for node %s:%u\n", + dlm->name, dlm->reco.dead_node); spin_lock(&dlm->spinlock); dlm_node_iter_init(dlm->domain_map, &iter); spin_unlock(&dlm->spinlock); -stage2: memset(&fr, 0, sizeof(fr)); fr.node_idx = dlm->node_num; fr.dead_node = dlm->reco.dead_node; - if (stage == 2) - fr.flags |= DLM_FINALIZE_STAGE2; while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { if (nodenum == dlm->node_num) continue; ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key, &fr, sizeof(fr), nodenum, &status); - if (ret >= 0) + if (ret >= 0) { ret = status; - if (ret < 0) { - mlog_errno(ret); if (dlm_is_host_down(ret)) { /* this has no effect on this recovery * session, so set the status to zero to @@ -2591,17 +2309,13 @@ stage2: mlog(ML_ERROR, "node %u went down after this " "node finished recovery.\n", nodenum); ret = 0; - continue; } + } + if (ret < 0) { + mlog_errno(ret); break; } } - if (stage == 1) { - /* reset the node_iter back to the top and send finalize2 */ - iter.curnode = -1; - stage = 2; - goto stage2; - } return ret; } @@ -2610,19 +2324,14 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data) { struct dlm_ctxt *dlm = data; struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf; - int stage = 1; /* ok to return 0, domain has gone away */ if (!dlm_grab(dlm)) return 0; - if (fr->flags & DLM_FINALIZE_STAGE2) - stage = 2; + mlog(0, "node %u finalizing recovery of node %u\n", + fr->node_idx, fr->dead_node); - mlog(0, "%s: node %u finalizing recovery stage%d of " - "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage, - fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master); - spin_lock(&dlm->spinlock); if (dlm->reco.new_master != fr->node_idx) { @@ -2638,41 +2347,13 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data) BUG(); } - switch (stage) { - case 1: - dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); - if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { - mlog(ML_ERROR, "%s: received finalize1 from " - "new master %u for dead node %u, but " - "this node has already received it!\n", - dlm->name, fr->node_idx, fr->dead_node); - dlm_print_reco_node_status(dlm); - BUG(); - } - dlm->reco.state |= DLM_RECO_STATE_FINALIZE; - spin_unlock(&dlm->spinlock); - break; - case 2: - if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) { - mlog(ML_ERROR, "%s: received finalize2 from " - "new master %u for dead node %u, but " - "this node did not have finalize1!\n", - dlm->name, fr->node_idx, fr->dead_node); - dlm_print_reco_node_status(dlm); - BUG(); - } - dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; - spin_unlock(&dlm->spinlock); - dlm_reset_recovery(dlm); - dlm_kick_recovery_thread(dlm); - break; - default: - BUG(); - } + dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); - mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n", - dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master); + spin_unlock(&dlm->spinlock); + dlm_reset_recovery(dlm); + + dlm_kick_recovery_thread(dlm); dlm_put(dlm); return 0; }