fedora core 6 1.2949 + vserver 2.2.0
[linux-2.6.git] / fs / ocfs2 / dlm / dlmrecovery.c
index 1e23200..367a11e 100644 (file)
@@ -58,7 +58,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
 static int dlm_recovery_thread(void *data);
 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
 int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
 static int dlm_do_recovery(struct dlm_ctxt *dlm);
 
 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
@@ -78,15 +78,9 @@ static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
                                    u8 send_to,
                                    struct dlm_lock_resource *res,
                                    int total_locks);
-static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
-                                     struct dlm_lock_resource *res,
-                                     u8 *real_master);
 static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                                     struct dlm_lock_resource *res,
                                     struct dlm_migratable_lockres *mres);
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
-                                struct dlm_lock_resource *res,
-                                u8 nodenum, u8 *real_master);
 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
                                 u8 dead_node, u8 send_to);
@@ -101,11 +95,14 @@ static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st);
 static void dlm_request_all_locks_worker(struct dlm_work_item *item,
                                         void *data);
 static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data);
+static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+                                     struct dlm_lock_resource *res,
+                                     u8 *real_master);
 
 static u64 dlm_get_next_mig_cookie(void);
 
-static spinlock_t dlm_reco_state_lock = SPIN_LOCK_UNLOCKED;
-static spinlock_t dlm_mig_cookie_lock = SPIN_LOCK_UNLOCKED;
+static DEFINE_SPINLOCK(dlm_reco_state_lock);
+static DEFINE_SPINLOCK(dlm_mig_cookie_lock);
 static u64 dlm_mig_cookie = 1;
 
 static u64 dlm_get_next_mig_cookie(void)
@@ -121,28 +118,63 @@ static u64 dlm_get_next_mig_cookie(void)
        return c;
 }
 
+static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm,
+                                         u8 dead_node)
+{
+       assert_spin_locked(&dlm->spinlock);
+       if (dlm->reco.dead_node != dead_node)
+               mlog(0, "%s: changing dead_node from %u to %u\n",
+                    dlm->name, dlm->reco.dead_node, dead_node);
+       dlm->reco.dead_node = dead_node;
+}
+
+static inline void dlm_set_reco_master(struct dlm_ctxt *dlm,
+                                      u8 master)
+{
+       assert_spin_locked(&dlm->spinlock);
+       mlog(0, "%s: changing new_master from %u to %u\n",
+            dlm->name, dlm->reco.new_master, master);
+       dlm->reco.new_master = master;
+}
+
+static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm)
+{
+       assert_spin_locked(&dlm->spinlock);
+       clear_bit(dlm->reco.dead_node, dlm->recovery_map);
+       dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
+       dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
+}
+
 static inline void dlm_reset_recovery(struct dlm_ctxt *dlm)
 {
        spin_lock(&dlm->spinlock);
-       clear_bit(dlm->reco.dead_node, dlm->recovery_map);
-       dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
-       dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
+       __dlm_reset_recovery(dlm);
        spin_unlock(&dlm->spinlock);
 }
 
 /* Worker function used during recovery. */
-void dlm_dispatch_work(void *data)
+void dlm_dispatch_work(struct work_struct *work)
 {
-       struct dlm_ctxt *dlm = (struct dlm_ctxt *)data;
+       struct dlm_ctxt *dlm =
+               container_of(work, struct dlm_ctxt, dispatched_work);
        LIST_HEAD(tmp_list);
        struct list_head *iter, *iter2;
        struct dlm_work_item *item;
        dlm_workfunc_t *workfunc;
+       int tot=0;
+
+       if (!dlm_joined(dlm))
+               return;
 
        spin_lock(&dlm->work_lock);
        list_splice_init(&dlm->work_list, &tmp_list);
        spin_unlock(&dlm->work_lock);
 
+       list_for_each_safe(iter, iter2, &tmp_list) {
+               tot++;
+       }
+       mlog(0, "%s: work thread has %d work items\n", dlm->name, tot);
+
        list_for_each_safe(iter, iter2, &tmp_list) {
                item = list_entry(iter, struct dlm_work_item, list);
                workfunc = item->func;
@@ -165,7 +197,7 @@ void dlm_dispatch_work(void *data)
  * RECOVERY THREAD
  */
 
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
 {
        /* wake the recovery thread
         * this will wake the reco thread in one of three places
@@ -226,6 +258,52 @@ void dlm_complete_recovery_thread(struct dlm_ctxt *dlm)
  *
  */
 
+static void dlm_print_reco_node_status(struct dlm_ctxt *dlm)
+{
+       struct dlm_reco_node_data *ndata;
+       struct dlm_lock_resource *res;
+
+       mlog(ML_NOTICE, "%s(%d): recovery info, state=%s, dead=%u, master=%u\n",
+            dlm->name, dlm->dlm_reco_thread_task->pid,
+            dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive",
+            dlm->reco.dead_node, dlm->reco.new_master);
+
+       list_for_each_entry(ndata, &dlm->reco.node_data, list) {
+               char *st = "unknown";
+               switch (ndata->state) {
+                       case DLM_RECO_NODE_DATA_INIT:
+                               st = "init";
+                               break;
+                       case DLM_RECO_NODE_DATA_REQUESTING:
+                               st = "requesting";
+                               break;
+                       case DLM_RECO_NODE_DATA_DEAD:
+                               st = "dead";
+                               break;
+                       case DLM_RECO_NODE_DATA_RECEIVING:
+                               st = "receiving";
+                               break;
+                       case DLM_RECO_NODE_DATA_REQUESTED:
+                               st = "requested";
+                               break;
+                       case DLM_RECO_NODE_DATA_DONE:
+                               st = "done";
+                               break;
+                       case DLM_RECO_NODE_DATA_FINALIZE_SENT:
+                               st = "finalize-sent";
+                               break;
+                       default:
+                               st = "bad";
+                               break;
+               }
+               mlog(ML_NOTICE, "%s: reco state, node %u, state=%s\n",
+                    dlm->name, ndata->node_num, st);
+       }
+       list_for_each_entry(res, &dlm->reco.resources, recovering) {
+               mlog(ML_NOTICE, "%s: lockres %.*s on recovering list\n",
+                    dlm->name, res->lockname.len, res->lockname.name);
+       }
+}
 
 #define DLM_RECO_THREAD_TIMEOUT_MS (5 * 1000)
 
@@ -273,11 +351,23 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
 {
        int dead;
        spin_lock(&dlm->spinlock);
-       dead = test_bit(node, dlm->domain_map);
+       dead = !test_bit(node, dlm->domain_map);
        spin_unlock(&dlm->spinlock);
        return dead;
 }
 
+/* returns true if node is no longer in the domain
+ * could be dead or just not joined */
+static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
+{
+       int recovered;
+       spin_lock(&dlm->spinlock);
+       recovered = !test_bit(node, dlm->recovery_map);
+       spin_unlock(&dlm->spinlock);
+       return recovered;
+}
+
+
 int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
 {
        if (timeout) {
@@ -296,6 +386,24 @@ int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
        return 0;
 }
 
+int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
+{
+       if (timeout) {
+               mlog(0, "%s: waiting %dms for notification of "
+                    "recovery of node %u\n", dlm->name, timeout, node);
+               wait_event_timeout(dlm->dlm_reco_thread_wq,
+                          dlm_is_node_recovered(dlm, node),
+                          msecs_to_jiffies(timeout));
+       } else {
+               mlog(0, "%s: waiting indefinitely for notification "
+                    "of recovery of node %u\n", dlm->name, node);
+               wait_event(dlm->dlm_reco_thread_wq,
+                          dlm_is_node_recovered(dlm, node));
+       }
+       /* for now, return 0 */
+       return 0;
+}
+
 /* callers of the top-level api calls (dlmlock/dlmunlock) should
  * block on the dlm->reco.event when recovery is in progress.
  * the dlm recovery thread will set this state when it begins
@@ -314,6 +422,13 @@ static int dlm_in_recovery(struct dlm_ctxt *dlm)
 
 void dlm_wait_for_recovery(struct dlm_ctxt *dlm)
 {
+       if (dlm_in_recovery(dlm)) {
+               mlog(0, "%s: reco thread %d in recovery: "
+                    "state=%d, master=%u, dead=%u\n",
+                    dlm->name, dlm->dlm_reco_thread_task->pid,
+                    dlm->reco.state, dlm->reco.new_master,
+                    dlm->reco.dead_node);
+       }
        wait_event(dlm->reco.event, !dlm_in_recovery(dlm));
 }
 
@@ -347,7 +462,7 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
                mlog(0, "new master %u died while recovering %u!\n",
                     dlm->reco.new_master, dlm->reco.dead_node);
                /* unset the new_master, leave dead_node */
-               dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
+               dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);
        }
 
        /* select a target to recover */
@@ -356,14 +471,14 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
 
                bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0);
                if (bit >= O2NM_MAX_NODES || bit < 0)
-                       dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
+                       dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
                else
-                       dlm->reco.dead_node = bit;
+                       dlm_set_reco_dead_node(dlm, bit);
        } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {
                /* BUG? */
                mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n",
                     dlm->reco.dead_node);
-               dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
+               dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);
        }
 
        if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
@@ -372,7 +487,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
                /* return to main thread loop and sleep. */
                return 0;
        }
-       mlog(0, "recovery thread found node %u in the recovery map!\n",
+       mlog(0, "%s(%d):recovery thread found node %u in the recovery map!\n",
+            dlm->name, dlm->dlm_reco_thread_task->pid,
             dlm->reco.dead_node);
        spin_unlock(&dlm->spinlock);
 
@@ -395,8 +511,8 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
                }
                mlog(0, "another node will master this recovery session.\n");
        }
-       mlog(0, "dlm=%s, new_master=%u, this node=%u, dead_node=%u\n",
-            dlm->name, dlm->reco.new_master,
+       mlog(0, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n",
+            dlm->name, dlm->dlm_reco_thread_task->pid, dlm->reco.new_master,
             dlm->node_num, dlm->reco.dead_node);
 
        /* it is safe to start everything back up here
@@ -408,11 +524,13 @@ static int dlm_do_recovery(struct dlm_ctxt *dlm)
        return 0;
 
 master_here:
-       mlog(0, "mastering recovery of %s:%u here(this=%u)!\n",
+       mlog(0, "(%d) mastering recovery of %s:%u here(this=%u)!\n",
+            dlm->dlm_reco_thread_task->pid,
             dlm->name, dlm->reco.dead_node, dlm->node_num);
 
        status = dlm_remaster_locks(dlm, dlm->reco.dead_node);
        if (status < 0) {
+               /* we should never hit this anymore */
                mlog(ML_ERROR, "error %d remastering locks for node %u, "
                     "retrying.\n", status, dlm->reco.dead_node);
                /* yield a bit to allow any final network messages
@@ -439,9 +557,16 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
        int destroy = 0;
        int pass = 0;
 
-       status = dlm_init_recovery_area(dlm, dead_node);
-       if (status < 0)
-               goto leave;
+       do {
+               /* we have become recovery master.  there is no escaping
+                * this, so just keep trying until we get it. */
+               status = dlm_init_recovery_area(dlm, dead_node);
+               if (status < 0) {
+                       mlog(ML_ERROR, "%s: failed to alloc recovery area, "
+                            "retrying\n", dlm->name);
+                       msleep(1000);
+               }
+       } while (status != 0);
 
        /* safe to access the node data list without a lock, since this
         * process is the only one to change the list */
@@ -458,16 +583,36 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
                        continue;
                }
 
-               status = dlm_request_all_locks(dlm, ndata->node_num, dead_node);
-               if (status < 0) {
-                       mlog_errno(status);
-                       if (dlm_is_host_down(status))
-                               ndata->state = DLM_RECO_NODE_DATA_DEAD;
-                       else {
-                               destroy = 1;
-                               goto leave;
+               do {
+                       status = dlm_request_all_locks(dlm, ndata->node_num,
+                                                      dead_node);
+                       if (status < 0) {
+                               mlog_errno(status);
+                               if (dlm_is_host_down(status)) {
+                                       /* node died, ignore it for recovery */
+                                       status = 0;
+                                       ndata->state = DLM_RECO_NODE_DATA_DEAD;
+                                       /* wait for the domain map to catch up
+                                        * with the network state. */
+                                       wait_event_timeout(dlm->dlm_reco_thread_wq,
+                                                          dlm_is_node_dead(dlm,
+                                                               ndata->node_num),
+                                                          msecs_to_jiffies(1000));
+                                       mlog(0, "waited 1 sec for %u, "
+                                            "dead? %s\n", ndata->node_num,
+                                            dlm_is_node_dead(dlm, ndata->node_num) ?
+                                            "yes" : "no");
+                               } else {
+                                       /* -ENOMEM on the other node */
+                                       mlog(0, "%s: node %u returned "
+                                            "%d during recovery, retrying "
+                                            "after a short wait\n",
+                                            dlm->name, ndata->node_num,
+                                            status);
+                                       msleep(100);
+                               }
                        }
-               }
+               } while (status != 0);
 
                switch (ndata->state) {
                        case DLM_RECO_NODE_DATA_INIT:
@@ -479,10 +624,9 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
                                mlog(0, "node %u died after requesting "
                                     "recovery info for node %u\n",
                                     ndata->node_num, dead_node);
-                               // start all over
-                               destroy = 1;
-                               status = -EAGAIN;
-                               goto leave;
+                               /* fine.  don't need this node's info.
+                                * continue without it. */
+                               break;
                        case DLM_RECO_NODE_DATA_REQUESTING:
                                ndata->state = DLM_RECO_NODE_DATA_REQUESTED;
                                mlog(0, "now receiving recovery data from "
@@ -526,35 +670,26 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
                                        BUG();
                                        break;
                                case DLM_RECO_NODE_DATA_DEAD:
-                                       mlog(ML_NOTICE, "node %u died after "
+                                       mlog(0, "node %u died after "
                                             "requesting recovery info for "
                                             "node %u\n", ndata->node_num,
                                             dead_node);
-                                       spin_unlock(&dlm_reco_state_lock);
-                                       // start all over
-                                       destroy = 1;
-                                       status = -EAGAIN;
-                                       /* instead of spinning like crazy here,
-                                        * wait for the domain map to catch up
-                                        * with the network state.  otherwise this
-                                        * can be hit hundreds of times before
-                                        * the node is really seen as dead. */
-                                       wait_event_timeout(dlm->dlm_reco_thread_wq,
-                                                          dlm_is_node_dead(dlm,
-                                                               ndata->node_num),
-                                                          msecs_to_jiffies(1000));
-                                       mlog(0, "waited 1 sec for %u, "
-                                            "dead? %s\n", ndata->node_num,
-                                            dlm_is_node_dead(dlm, ndata->node_num) ?
-                                            "yes" : "no");
-                                       goto leave;
+                                       break;
                                case DLM_RECO_NODE_DATA_RECEIVING:
                                case DLM_RECO_NODE_DATA_REQUESTED:
+                                       mlog(0, "%s: node %u still in state %s\n",
+                                            dlm->name, ndata->node_num,
+                                            ndata->state==DLM_RECO_NODE_DATA_RECEIVING ?
+                                            "receiving" : "requested");
                                        all_nodes_done = 0;
                                        break;
                                case DLM_RECO_NODE_DATA_DONE:
+                                       mlog(0, "%s: node %u state is done\n",
+                                            dlm->name, ndata->node_num);
                                        break;
                                case DLM_RECO_NODE_DATA_FINALIZE_SENT:
+                                       mlog(0, "%s: node %u state is finalize\n",
+                                            dlm->name, ndata->node_num);
                                        break;
                        }
                }
@@ -584,7 +719,7 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
                             jiffies, dlm->reco.dead_node,
                             dlm->node_num, dlm->reco.new_master);
                        destroy = 1;
-                       status = ret;
+                       status = 0;
                        /* rescan everything marked dirty along the way */
                        dlm_kick_thread(dlm, NULL);
                        break;
@@ -597,7 +732,6 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
 
        }
 
-leave:
        if (destroy)
                dlm_destroy_recovery_area(dlm, dead_node);
 
@@ -623,7 +757,7 @@ static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node)
                }
                BUG_ON(num == dead_node);
 
-               ndata = kcalloc(1, sizeof(*ndata), GFP_KERNEL);
+               ndata = kzalloc(sizeof(*ndata), GFP_NOFS);
                if (!ndata) {
                        dlm_destroy_recovery_area(dlm, dead_node);
                        return -ENOMEM;
@@ -697,16 +831,25 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
        if (!dlm_grab(dlm))
                return -EINVAL;
 
+       if (lr->dead_node != dlm->reco.dead_node) {
+               mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local "
+                    "dead_node is %u\n", dlm->name, lr->node_idx,
+                    lr->dead_node, dlm->reco.dead_node);
+               dlm_print_reco_node_status(dlm);
+               /* this is a hack */
+               dlm_put(dlm);
+               return -ENOMEM;
+       }
        BUG_ON(lr->dead_node != dlm->reco.dead_node);
 
-       item = kcalloc(1, sizeof(*item), GFP_KERNEL);
+       item = kzalloc(sizeof(*item), GFP_NOFS);
        if (!item) {
                dlm_put(dlm);
                return -ENOMEM;
        }
 
        /* this will get freed by dlm_request_all_locks_worker */
-       buf = (char *) __get_free_page(GFP_KERNEL);
+       buf = (char *) __get_free_page(GFP_NOFS);
        if (!buf) {
                kfree(item);
                dlm_put(dlm);
@@ -721,7 +864,7 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
        spin_lock(&dlm->work_lock);
        list_add_tail(&item->list, &dlm->work_list);
        spin_unlock(&dlm->work_lock);
-       schedule_work(&dlm->dispatched_work);
+       queue_work(dlm->dlm_worker, &dlm->dispatched_work);
 
        dlm_put(dlm);
        return 0;
@@ -736,30 +879,34 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
        struct list_head *iter;
        int ret;
        u8 dead_node, reco_master;
+       int skip_all_done = 0;
 
        dlm = item->dlm;
        dead_node = item->u.ral.dead_node;
        reco_master = item->u.ral.reco_master;
        mres = (struct dlm_migratable_lockres *)data;
 
+       mlog(0, "%s: recovery worker started, dead=%u, master=%u\n",
+            dlm->name, dead_node, reco_master);
+
        if (dead_node != dlm->reco.dead_node ||
            reco_master != dlm->reco.new_master) {
-               /* show extra debug info if the recovery state is messed */
-               mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), "
-                    "request(dead=%u, master=%u)\n",
-                    dlm->name, dlm->reco.dead_node, dlm->reco.new_master,
-                    dead_node, reco_master);
-               mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u "
-                    "entry[0]={c=%"MLFu64",l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n",
-                    dlm->name, mres->lockname_len, mres->lockname, mres->master,
-                    mres->num_locks, mres->total_locks, mres->flags,
-                    mres->ml[0].cookie, mres->ml[0].list, mres->ml[0].flags,
-                    mres->ml[0].type, mres->ml[0].convert_type,
-                    mres->ml[0].highest_blocked, mres->ml[0].node);
-               BUG();
+               /* worker could have been created before the recovery master
+                * died.  if so, do not continue, but do not error. */
+               if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {
+                       mlog(ML_NOTICE, "%s: will not send recovery state, "
+                            "recovery master %u died, thread=(dead=%u,mas=%u)"
+                            " current=(dead=%u,mas=%u)\n", dlm->name,
+                            reco_master, dead_node, reco_master,
+                            dlm->reco.dead_node, dlm->reco.new_master);
+               } else {
+                       mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, "
+                            "master=%u), request(dead=%u, master=%u)\n",
+                            dlm->name, dlm->reco.dead_node,
+                            dlm->reco.new_master, dead_node, reco_master);
+               }
+               goto leave;
        }
-       BUG_ON(dead_node != dlm->reco.dead_node);
-       BUG_ON(reco_master != dlm->reco.new_master);
 
        /* lock resources should have already been moved to the
         * dlm->reco.resources list.  now move items from that list
@@ -770,12 +917,20 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
        dlm_move_reco_locks_to_list(dlm, &resources, dead_node);
 
        /* now we can begin blasting lockreses without the dlm lock */
+
+       /* any errors returned will be due to the new_master dying,
+        * the dlm_reco_thread should detect this */
        list_for_each(iter, &resources) {
                res = list_entry (iter, struct dlm_lock_resource, recovering);
                ret = dlm_send_one_lockres(dlm, res, mres, reco_master,
                                        DLM_MRES_RECOVERY);
-               if (ret < 0)
-                       mlog_errno(ret);
+               if (ret < 0) {
+                       mlog(ML_ERROR, "%s: node %u went down while sending "
+                            "recovery state for dead node %u, ret=%d\n", dlm->name,
+                            reco_master, dead_node, ret);
+                       skip_all_done = 1;
+                       break;
+               }
        }
 
        /* move the resources back to the list */
@@ -783,10 +938,15 @@ static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data)
        list_splice_init(&resources, &dlm->reco.resources);
        spin_unlock(&dlm->spinlock);
 
-       ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
-       if (ret < 0)
-               mlog_errno(ret);
-
+       if (!skip_all_done) {
+               ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);
+               if (ret < 0) {
+                       mlog(ML_ERROR, "%s: node %u went down while sending "
+                            "recovery all-done for dead node %u, ret=%d\n",
+                            dlm->name, reco_master, dead_node, ret);
+               }
+       }
+leave:
        free_page((unsigned long)data);
 }
 
@@ -805,8 +965,14 @@ static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to)
 
        ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,
                                 sizeof(done_msg), send_to, &tmpret);
-       /* negative status is ignored by the caller */
-       if (ret >= 0)
+       if (ret < 0) {
+               if (!dlm_is_host_down(ret)) {
+                       mlog_errno(ret);
+                       mlog(ML_ERROR, "%s: unknown error sending data-done "
+                            "to %u\n", dlm->name, send_to);
+                       BUG();
+               }
+       } else
                ret = tmpret;
        return ret;
 }
@@ -826,7 +992,11 @@ int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data)
        mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "
             "node_idx=%u, this node=%u\n", done->dead_node,
             dlm->reco.dead_node, done->node_idx, dlm->node_num);
-       BUG_ON(done->dead_node != dlm->reco.dead_node);
+
+       mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node),
+                       "Got DATA DONE: dead_node=%u, reco.dead_node=%u, "
+                       "node_idx=%u, this node=%u\n", done->dead_node,
+                       dlm->reco.dead_node, done->node_idx, dlm->node_num);
 
        spin_lock(&dlm_reco_state_lock);
        list_for_each(iter, &dlm->reco.node_data) {
@@ -909,13 +1079,11 @@ static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
                        mlog(0, "found lockres owned by dead node while "
                                  "doing recovery for node %u. sending it.\n",
                                  dead_node);
-                       list_del_init(&res->recovering);
-                       list_add_tail(&res->recovering, list);
+                       list_move_tail(&res->recovering, list);
                } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
                        mlog(0, "found UNKNOWN owner while doing recovery "
                                  "for node %u. sending it.\n", dead_node);
-                       list_del_init(&res->recovering);
-                       list_add_tail(&res->recovering, list);
+                       list_move_tail(&res->recovering, list);
                }
        }
        spin_unlock(&dlm->spinlock);
@@ -1027,8 +1195,9 @@ static int dlm_add_lock_to_array(struct dlm_lock *lock,
                    ml->type == LKM_PRMODE) {
                        /* if it is already set, this had better be a PR
                         * and it has to match */
-                       if (mres->lvb[0] && (ml->type == LKM_EXMODE ||
-                           memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
+                       if (!dlm_lvb_is_empty(mres->lvb) &&
+                           (ml->type == LKM_EXMODE ||
+                            memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {
                                mlog(ML_ERROR, "mismatched lvbs!\n");
                                __dlm_print_one_lock_resource(lock->lockres);
                                BUG();
@@ -1087,22 +1256,25 @@ int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
                         * we must send it immediately. */
                        ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,
                                                       res, total_locks);
-                       if (ret < 0) {
-                               // TODO
-                               mlog(ML_ERROR, "dlm_send_mig_lockres_msg "
-                                    "returned %d, TODO\n", ret);
-                               BUG();
-                       }
+                       if (ret < 0)
+                               goto error;
                }
        }
        /* flush any remaining locks */
        ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
-       if (ret < 0) {
-               // TODO
-               mlog(ML_ERROR, "dlm_send_mig_lockres_msg returned %d, "
-                    "TODO\n", ret);
+       if (ret < 0)
+               goto error;
+       return ret;
+
+error:
+       mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n",
+            dlm->name, ret);
+       if (!dlm_is_host_down(ret))
                BUG();
-       }
+       mlog(0, "%s: node %u went down while sending %s "
+            "lockres %.*s\n", dlm->name, send_to,
+            flags & DLM_MRES_RECOVERY ?  "recovery" : "migration",
+            res->lockname.len, res->lockname.name);
        return ret;
 }
 
@@ -1150,8 +1322,8 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
                mlog(0, "all done flag.  all lockres data received!\n");
 
        ret = -ENOMEM;
-       buf = kmalloc(be16_to_cpu(msg->data_len), GFP_KERNEL);
-       item = kcalloc(1, sizeof(*item), GFP_KERNEL);
+       buf = kmalloc(be16_to_cpu(msg->data_len), GFP_NOFS);
+       item = kzalloc(sizeof(*item), GFP_NOFS);
        if (!buf || !item)
                goto leave;
 
@@ -1242,7 +1414,7 @@ int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
        spin_lock(&dlm->work_lock);
        list_add_tail(&item->list, &dlm->work_list);
        spin_unlock(&dlm->work_lock);
-       schedule_work(&dlm->dispatched_work);
+       queue_work(dlm->dlm_worker, &dlm->dispatched_work);
 
 leave:
        dlm_put(dlm);
@@ -1360,8 +1532,10 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
                ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
                if (ret < 0) {
                        mlog_errno(ret);
-                       BUG();
-                       /* TODO: need to figure a way to restart this */
+                       if (!dlm_is_host_down(ret))
+                               BUG();
+                       /* host is down, so answer for that node would be
+                        * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
                }
                if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
                        mlog(0, "lock master is %u\n", *real_master);
@@ -1372,9 +1546,8 @@ static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
 }
 
 
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
-                                struct dlm_lock_resource *res,
-                                u8 nodenum, u8 *real_master)
+int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
+                         u8 nodenum, u8 *real_master)
 {
        int ret = -EINVAL;
        struct dlm_master_requery req;
@@ -1410,6 +1583,7 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
        struct dlm_ctxt *dlm = data;
        struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
        struct dlm_lock_resource *res = NULL;
+       unsigned int hash;
        int master = DLM_LOCK_RES_OWNER_UNKNOWN;
        u32 flags = DLM_ASSERT_MASTER_REQUERY;
 
@@ -1419,8 +1593,10 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data)
                return master;
        }
 
+       hash = dlm_lockid_hash(req->name, req->namelen);
+
        spin_lock(&dlm->spinlock);
-       res = __dlm_lookup_lockres(dlm, req->name, req->namelen);
+       res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash);
        if (res) {
                spin_lock(&res->spinlock);
                master = res->owner;
@@ -1487,7 +1663,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
        struct dlm_lock *newlock = NULL;
        struct dlm_lockstatus *lksb = NULL;
        int ret = 0;
-       int i;
+       int i, bad;
        struct list_head *iter;
        struct dlm_lock *lock = NULL;
 
@@ -1519,9 +1695,11 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                        /* lock is always created locally first, and
                         * destroyed locally last.  it must be on the list */
                        if (!lock) {
+                               u64 c = ml->cookie;
                                mlog(ML_ERROR, "could not find local lock "
-                                              "with cookie %"MLFu64"!\n",
-                                    ml->cookie);
+                                              "with cookie %u:%llu!\n",
+                                              dlm_get_lock_cookie_node(c),
+                                              dlm_get_lock_cookie_seq(c));
                                BUG();
                        }
                        BUG_ON(lock->ml.node != ml->node);
@@ -1531,8 +1709,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
 
                        /* move the lock to its proper place */
                        /* do not alter lock refcount.  switching lists. */
-                       list_del_init(&lock->list);
-                       list_add_tail(&lock->list, queue);
+                       list_move_tail(&lock->list, queue);
                        spin_unlock(&res->spinlock);
 
                        mlog(0, "just reordered a local lock!\n");
@@ -1555,28 +1732,48 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                }
                lksb->flags |= (ml->flags &
                                (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
-                       
-               if (mres->lvb[0]) {
+
+               if (ml->type == LKM_NLMODE)
+                       goto skip_lvb;
+
+               if (!dlm_lvb_is_empty(mres->lvb)) {
                        if (lksb->flags & DLM_LKSB_PUT_LVB) {
                                /* other node was trying to update
                                 * lvb when node died.  recreate the
                                 * lksb with the updated lvb. */
                                memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);
+                               /* the lock resource lvb update must happen
+                                * NOW, before the spinlock is dropped.
+                                * we no longer wait for the AST to update
+                                * the lvb. */
+                               memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
                        } else {
                                /* otherwise, the node is sending its 
                                 * most recent valid lvb info */
                                BUG_ON(ml->type != LKM_EXMODE &&
                                       ml->type != LKM_PRMODE);
-                               if (res->lvb[0] && (ml->type == LKM_EXMODE ||
-                                   memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
-                                       mlog(ML_ERROR, "received bad lvb!\n");
-                                       __dlm_print_one_lock_resource(res);
-                                       BUG();
+                               if (!dlm_lvb_is_empty(res->lvb) &&
+                                   (ml->type == LKM_EXMODE ||
+                                    memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {
+                                       int i;
+                                       mlog(ML_ERROR, "%s:%.*s: received bad "
+                                            "lvb! type=%d\n", dlm->name,
+                                            res->lockname.len,
+                                            res->lockname.name, ml->type);
+                                       printk("lockres lvb=[");
+                                       for (i=0; i<DLM_LVB_LEN; i++)
+                                               printk("%02x", res->lvb[i]);
+                                       printk("]\nmigrated lvb=[");
+                                       for (i=0; i<DLM_LVB_LEN; i++)
+                                               printk("%02x", mres->lvb[i]);
+                                       printk("]\n");
+                                       dlm_print_one_lock_resource(res);
+                                       BUG();
                                }
                                memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);
                        }
                }
-
+skip_lvb:
 
                /* NOTE:
                 * wrt lock queue ordering and recovery:
@@ -1594,9 +1791,33 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                 * relative to each other, but clearly *not*
                 * preserved relative to locks from other nodes.
                 */
+               bad = 0;
                spin_lock(&res->spinlock);
-               dlm_lock_get(newlock);
-               list_add_tail(&newlock->list, queue);
+               list_for_each_entry(lock, queue, list) {
+                       if (lock->ml.cookie == ml->cookie) {
+                               u64 c = lock->ml.cookie;
+                               mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
+                                    "exists on this lockres!\n", dlm->name,
+                                    res->lockname.len, res->lockname.name,
+                                    dlm_get_lock_cookie_node(c),
+                                    dlm_get_lock_cookie_seq(c));
+
+                               mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
+                                    "node=%u, cookie=%u:%llu, queue=%d\n",
+                                    ml->type, ml->convert_type, ml->node,
+                                    dlm_get_lock_cookie_node(ml->cookie),
+                                    dlm_get_lock_cookie_seq(ml->cookie),
+                                    ml->list);
+
+                               __dlm_print_one_lock_resource(res);
+                               bad = 1;
+                               break;
+                       }
+               }
+               if (!bad) {
+                       dlm_lock_get(newlock);
+                       list_add_tail(&newlock->list, queue);
+               }
                spin_unlock(&res->spinlock);
        }
        mlog(0, "done running all the locks\n");
@@ -1620,8 +1841,14 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,
        struct dlm_lock *lock;
 
        res->state |= DLM_LOCK_RES_RECOVERING;
-       if (!list_empty(&res->recovering))
+       if (!list_empty(&res->recovering)) {
+               mlog(0,
+                    "Recovering res %s:%.*s, is already on recovery list!\n",
+                    dlm->name, res->lockname.len, res->lockname.name);
                list_del_init(&res->recovering);
+       }
+       /* We need to hold a reference while on the recovery list */
+       dlm_lockres_get(res);
        list_add_tail(&res->recovering, &dlm->reco.resources);
 
        /* find any pending locks and put them back on proper list */
@@ -1710,9 +1937,11 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
                        spin_lock(&res->spinlock);
                        dlm_change_lockres_owner(dlm, res, new_master);
                        res->state &= ~DLM_LOCK_RES_RECOVERING;
-                       __dlm_dirty_lockres(dlm, res);
+                       if (!__dlm_lockres_unused(res))
+                               __dlm_dirty_lockres(dlm, res);
                        spin_unlock(&res->spinlock);
                        wake_up(&res->wq);
+                       dlm_lockres_put(res);
                }
        }
 
@@ -1721,7 +1950,7 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
         * the RECOVERING state and set the owner
         * if necessary */
        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
-               bucket = &(dlm->lockres_hash[i]);
+               bucket = dlm_lockres_hash(dlm, i);
                hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
                        if (res->state & DLM_LOCK_RES_RECOVERING) {
                                if (res->owner == dead_node) {
@@ -1739,10 +1968,19 @@ static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
                                } else
                                        continue;
 
+                               if (!list_empty(&res->recovering)) {
+                                       mlog(0, "%s:%.*s: lockres was "
+                                            "marked RECOVERING, owner=%u\n",
+                                            dlm->name, res->lockname.len,
+                                            res->lockname.name, res->owner);
+                                       list_del_init(&res->recovering);
+                                       dlm_lockres_put(res);
+                               }
                                spin_lock(&res->spinlock);
                                dlm_change_lockres_owner(dlm, res, new_master);
                                res->state &= ~DLM_LOCK_RES_RECOVERING;
-                               __dlm_dirty_lockres(dlm, res);
+                               if (!__dlm_lockres_unused(res))
+                                       __dlm_dirty_lockres(dlm, res);
                                spin_unlock(&res->spinlock);
                                wake_up(&res->wq);
                        }
@@ -1879,7 +2117,7 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
         *    need to be fired as a result.
         */
        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
-               bucket = &(dlm->lockres_hash[i]);
+               bucket = dlm_lockres_hash(dlm, i);
                hlist_for_each_entry(res, iter, bucket, hash_node) {
                        /* always prune any $RECOVERY entries for dead nodes,
                         * otherwise hangs can occur during later recovery */
@@ -1919,6 +2157,20 @@ static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx)
 {
        assert_spin_locked(&dlm->spinlock);
 
+       if (dlm->reco.new_master == idx) {
+               mlog(0, "%s: recovery master %d just died\n",
+                    dlm->name, idx);
+               if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
+                       /* finalize1 was reached, so it is safe to clear
+                        * the new_master and dead_node.  that recovery
+                        * is complete. */
+                       mlog(0, "%s: dead master %d had reached "
+                            "finalize1 state, clearing\n", dlm->name, idx);
+                       dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
+                       __dlm_reset_recovery(dlm);
+               }
+       }
+
        /* check to see if the node is already considered dead */
        if (!test_bit(idx, dlm->live_nodes_map)) {
                mlog(0, "for domain %s, node %d is already dead. "
@@ -2034,7 +2286,8 @@ again:
        memset(&lksb, 0, sizeof(lksb));
 
        ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,
-                     DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);
+                     DLM_RECOVERY_LOCK_NAME, DLM_RECOVERY_LOCK_NAME_LEN,
+                     dlm_reco_ast, dlm, dlm_reco_bast);
 
        mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",
             dlm->name, ret, lksb.status);
@@ -2082,7 +2335,7 @@ again:
 
                        /* set the new_master to this node */
                        spin_lock(&dlm->spinlock);
-                       dlm->reco.new_master = dlm->node_num;
+                       dlm_set_reco_master(dlm, dlm->node_num);
                        spin_unlock(&dlm->spinlock);
                }
 
@@ -2120,6 +2373,10 @@ again:
                mlog(0, "%s: reco master %u is ready to recover %u\n",
                     dlm->name, dlm->reco.new_master, dlm->reco.dead_node);
                status = -EEXIST;
+       } else if (ret == DLM_RECOVERING) {
+               mlog(0, "dlm=%s dlmlock says master node died (this=%u)\n",
+                    dlm->name, dlm->node_num);
+               goto again;
        } else {
                struct dlm_lock_resource *res;
 
@@ -2151,7 +2408,7 @@ static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node)
 
        mlog_entry("%u\n", dead_node);
 
-       mlog(0, "dead node is %u\n", dead_node);
+       mlog(0, "%s: dead node is %u\n", dlm->name, dead_node);
 
        spin_lock(&dlm->spinlock);
        dlm_node_iter_init(dlm->domain_map, &iter);
@@ -2209,6 +2466,14 @@ retry:
                         * another ENOMEM */
                        msleep(100);
                        goto retry;
+               } else if (ret == EAGAIN) {
+                       mlog(0, "%s: trying to start recovery of node "
+                            "%u, but node %u is waiting for last recovery "
+                            "to complete, backoff for a bit\n", dlm->name,
+                            dead_node, nodenum);
+                       /* TODO Look into replacing msleep with cond_resched() */
+                       msleep(100);
+                       goto retry;
                }
        }
 
@@ -2224,8 +2489,20 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
        if (!dlm_grab(dlm))
                return 0;
 
-       mlog(0, "node %u wants to recover node %u\n",
-                 br->node_idx, br->dead_node);
+       spin_lock(&dlm->spinlock);
+       if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
+               mlog(0, "%s: node %u wants to recover node %u (%u:%u) "
+                    "but this node is in finalize state, waiting on finalize2\n",
+                    dlm->name, br->node_idx, br->dead_node,
+                    dlm->reco.dead_node, dlm->reco.new_master);
+               spin_unlock(&dlm->spinlock);
+               return EAGAIN;
+       }
+       spin_unlock(&dlm->spinlock);
+
+       mlog(0, "%s: node %u wants to recover node %u (%u:%u)\n",
+            dlm->name, br->node_idx, br->dead_node,
+            dlm->reco.dead_node, dlm->reco.new_master);
 
        dlm_fire_domain_eviction_callbacks(dlm, br->dead_node);
 
@@ -2247,8 +2524,8 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
                     "node %u changing it to %u\n", dlm->name, 
                     dlm->reco.dead_node, br->node_idx, br->dead_node);
        }
-       dlm->reco.new_master = br->node_idx;
-       dlm->reco.dead_node = br->dead_node;
+       dlm_set_reco_master(dlm, br->node_idx);
+       dlm_set_reco_dead_node(dlm, br->dead_node);
        if (!test_bit(br->dead_node, dlm->recovery_map)) {
                mlog(0, "recovery master %u sees %u as dead, but this "
                     "node has not yet.  marking %u as dead\n",
@@ -2258,16 +2535,25 @@ int dlm_begin_reco_handler(struct o2net_msg *msg, u32 len, void *data)
                        mlog(0, "%u not in domain/live_nodes map "
                             "so setting it in reco map manually\n",
                             br->dead_node);
-               set_bit(br->dead_node, dlm->recovery_map);
+               /* force the recovery cleanup in __dlm_hb_node_down
+                * both of these will be cleared in a moment */
+               set_bit(br->dead_node, dlm->domain_map);
+               set_bit(br->dead_node, dlm->live_nodes_map);
                __dlm_hb_node_down(dlm, br->dead_node);
        }
        spin_unlock(&dlm->spinlock);
 
        dlm_kick_recovery_thread(dlm);
+
+       mlog(0, "%s: recovery started by node %u, for %u (%u:%u)\n",
+            dlm->name, br->node_idx, br->dead_node,
+            dlm->reco.dead_node, dlm->reco.new_master);
+
        dlm_put(dlm);
        return 0;
 }
 
+#define DLM_FINALIZE_STAGE2  0x01
 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
 {
        int ret = 0;
@@ -2275,25 +2561,31 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
        struct dlm_node_iter iter;
        int nodenum;
        int status;
+       int stage = 1;
 
-       mlog(0, "finishing recovery for node %s:%u\n",
-            dlm->name, dlm->reco.dead_node);
+       mlog(0, "finishing recovery for node %s:%u, "
+            "stage %d\n", dlm->name, dlm->reco.dead_node, stage);
 
        spin_lock(&dlm->spinlock);
        dlm_node_iter_init(dlm->domain_map, &iter);
        spin_unlock(&dlm->spinlock);
 
+stage2:
        memset(&fr, 0, sizeof(fr));
        fr.node_idx = dlm->node_num;
        fr.dead_node = dlm->reco.dead_node;
+       if (stage == 2)
+               fr.flags |= DLM_FINALIZE_STAGE2;
 
        while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
                if (nodenum == dlm->node_num)
                        continue;
                ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key,
                                         &fr, sizeof(fr), nodenum, &status);
-               if (ret >= 0) {
+               if (ret >= 0)
                        ret = status;
+               if (ret < 0) {
+                       mlog_errno(ret);
                        if (dlm_is_host_down(ret)) {
                                /* this has no effect on this recovery 
                                 * session, so set the status to zero to 
@@ -2301,13 +2593,17 @@ static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm)
                                mlog(ML_ERROR, "node %u went down after this "
                                     "node finished recovery.\n", nodenum);
                                ret = 0;
+                               continue;
                        }
-               }
-               if (ret < 0) {
-                       mlog_errno(ret);
                        break;
                }
        }
+       if (stage == 1) {
+               /* reset the node_iter back to the top and send finalize2 */
+               iter.curnode = -1;
+               stage = 2;
+               goto stage2;
+       }
 
        return ret;
 }
@@ -2316,14 +2612,19 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
 {
        struct dlm_ctxt *dlm = data;
        struct dlm_finalize_reco *fr = (struct dlm_finalize_reco *)msg->buf;
+       int stage = 1;
 
        /* ok to return 0, domain has gone away */
        if (!dlm_grab(dlm))
                return 0;
 
-       mlog(0, "node %u finalizing recovery of node %u\n",
-            fr->node_idx, fr->dead_node);
+       if (fr->flags & DLM_FINALIZE_STAGE2)
+               stage = 2;
 
+       mlog(0, "%s: node %u finalizing recovery stage%d of "
+            "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage,
+            fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master);
        spin_lock(&dlm->spinlock);
 
        if (dlm->reco.new_master != fr->node_idx) {
@@ -2339,13 +2640,41 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data)
                BUG();
        }
 
-       dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
-
-       spin_unlock(&dlm->spinlock);
+       switch (stage) {
+               case 1:
+                       dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx);
+                       if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) {
+                               mlog(ML_ERROR, "%s: received finalize1 from "
+                                    "new master %u for dead node %u, but "
+                                    "this node has already received it!\n",
+                                    dlm->name, fr->node_idx, fr->dead_node);
+                               dlm_print_reco_node_status(dlm);
+                               BUG();
+                       }
+                       dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
+                       spin_unlock(&dlm->spinlock);
+                       break;
+               case 2:
+                       if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) {
+                               mlog(ML_ERROR, "%s: received finalize2 from "
+                                    "new master %u for dead node %u, but "
+                                    "this node did not have finalize1!\n",
+                                    dlm->name, fr->node_idx, fr->dead_node);
+                               dlm_print_reco_node_status(dlm);
+                               BUG();
+                       }
+                       dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
+                       spin_unlock(&dlm->spinlock);
+                       dlm_reset_recovery(dlm);
+                       dlm_kick_recovery_thread(dlm);
+                       break;
+               default:
+                       BUG();
+       }
 
-       dlm_reset_recovery(dlm);
+       mlog(0, "%s: recovery done, reco master was %u, dead now %u, master now %u\n",
+            dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master);
 
-       dlm_kick_recovery_thread(dlm);
        dlm_put(dlm);
        return 0;
 }