fedora core 6 1.2949 + vserver 2.2.0
[linux-2.6.git] / fs / ocfs2 / cluster / heartbeat.c
index d08971d..277ca67 100644 (file)
@@ -54,7 +54,7 @@ static DECLARE_RWSEM(o2hb_callback_sem);
  * multiple hb threads are watching multiple regions.  A node is live
  * whenever any of the threads sees activity from the node in its region.
  */
-static spinlock_t o2hb_live_lock = SPIN_LOCK_UNLOCKED;
+static DEFINE_SPINLOCK(o2hb_live_lock);
 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 static LIST_HEAD(o2hb_node_events);
@@ -141,7 +141,7 @@ struct o2hb_region {
         * recognizes a node going up and down in one iteration */
        u64                     hr_generation;
 
-       struct work_struct      hr_write_timeout_work;
+       struct delayed_work     hr_write_timeout_work;
        unsigned long           hr_last_timeout_start;
 
        /* Used during o2hb_check_slot to hold a copy of the block
@@ -153,11 +153,14 @@ struct o2hb_region {
 struct o2hb_bio_wait_ctxt {
        atomic_t          wc_num_reqs;
        struct completion wc_io_complete;
+       int               wc_error;
 };
 
-static void o2hb_write_timeout(void *arg)
+static void o2hb_write_timeout(struct work_struct *work)
 {
-       struct o2hb_region *reg = arg;
+       struct o2hb_region *reg =
+               container_of(work, struct o2hb_region,
+                            hr_write_timeout_work.work);
 
        mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
             "milliseconds\n", reg->hr_dev_name,
@@ -186,6 +189,7 @@ static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc,
 {
        atomic_set(&wc->wc_num_reqs, num_ios);
        init_completion(&wc->wc_io_complete);
+       wc->wc_error = 0;
 }
 
 /* Used in error paths too */
@@ -218,8 +222,10 @@ static int o2hb_bio_end_io(struct bio *bio,
 {
        struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
 
-       if (error)
+       if (error) {
                mlog(ML_ERROR, "IO Error %d\n", error);
+               wc->wc_error = error;
+       }
 
        if (bio->bi_size)
                return 1;
@@ -316,8 +322,12 @@ static int compute_max_sectors(struct block_device *bdev)
                max_pages = q->max_hw_segments;
        max_pages--; /* Handle I/Os that straddle a page */
 
-       max_sectors = max_pages << (PAGE_SHIFT - 9);
-
+       if (max_pages) {
+               max_sectors = max_pages << (PAGE_SHIFT - 9);
+       } else {
+               /* If BIO contains 1 or less than 1 page. */
+               max_sectors = q->max_sectors;
+       }
        /* Why is fls() 1-based???? */
        pow_two_sectors = 1 << (fls(max_sectors) - 1);
 
@@ -390,6 +400,8 @@ static int o2hb_read_slots(struct o2hb_region *reg,
 
 bail_and_wait:
        o2hb_wait_on_io(reg, &wc);
+       if (wc.wc_error && !status)
+               status = wc.wc_error;
 
        if (bios) {
                for(i = 0; i < num_bios; i++)
@@ -449,11 +461,11 @@ static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
 
 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
 {
-       mlog(ML_ERROR, "Dump slot information: seq = 0x%"MLFx64", node = %u, "
-            "cksum = 0x%x, generation 0x%"MLFx64"\n",
-            le64_to_cpu(hb_block->hb_seq), hb_block->hb_node,
-            le32_to_cpu(hb_block->hb_cksum),
-            le64_to_cpu(hb_block->hb_generation));
+       mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
+            "cksum = 0x%x, generation 0x%llx\n",
+            (long long)le64_to_cpu(hb_block->hb_seq),
+            hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
+            (long long)le64_to_cpu(hb_block->hb_generation));
 }
 
 static int o2hb_verify_crc(struct o2hb_region *reg,
@@ -511,13 +523,15 @@ static inline void o2hb_prepare_block(struct o2hb_region *reg,
        hb_block->hb_seq = cpu_to_le64(cputime);
        hb_block->hb_node = node_num;
        hb_block->hb_generation = cpu_to_le64(generation);
+       hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
 
        /* This step must always happen last! */
        hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
                                                                   hb_block));
 
-       mlog(ML_HB_BIO, "our node generation = 0x%"MLFx64", cksum = 0x%x\n",
-            cpu_to_le64(generation), le32_to_cpu(hb_block->hb_cksum));
+       mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
+            (long long)cpu_to_le64(generation),
+            le32_to_cpu(hb_block->hb_cksum));
 }
 
 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
@@ -638,6 +652,8 @@ static int o2hb_check_slot(struct o2hb_region *reg,
        struct o2nm_node *node;
        struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
        u64 cputime;
+       unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
+       unsigned int slot_dead_ms;
 
        memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
 
@@ -686,19 +702,20 @@ static int o2hb_check_slot(struct o2hb_region *reg,
        if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
                gen_changed = 1;
                slot->ds_equal_samples = 0;
-               mlog(ML_HEARTBEAT, "Node %d changed generation (0x%"MLFx64" "
-                    "to 0x%"MLFx64")\n", slot->ds_node_num,
-                    slot->ds_last_generation,
-                    le64_to_cpu(hb_block->hb_generation));
+               mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
+                    "to 0x%llx)\n", slot->ds_node_num,
+                    (long long)slot->ds_last_generation,
+                    (long long)le64_to_cpu(hb_block->hb_generation));
        }
 
        slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
 
-       mlog(ML_HEARTBEAT, "Slot %d gen 0x%"MLFx64" cksum 0x%x "
-            "seq %"MLFu64" last %"MLFu64" changed %u equal %u\n",
-            slot->ds_node_num, slot->ds_last_generation,
-            le32_to_cpu(hb_block->hb_cksum), le64_to_cpu(hb_block->hb_seq), 
-            slot->ds_last_time, slot->ds_changed_samples,
+       mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
+            "seq %llu last %llu changed %u equal %u\n",
+            slot->ds_node_num, (long long)slot->ds_last_generation,
+            le32_to_cpu(hb_block->hb_cksum),
+            (unsigned long long)le64_to_cpu(hb_block->hb_seq), 
+            (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
             slot->ds_equal_samples);
 
        spin_lock(&o2hb_live_lock);
@@ -708,8 +725,8 @@ fire_callbacks:
         * changes at any time during their dead time */
        if (list_empty(&slot->ds_live_item) &&
            slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
-               mlog(ML_HEARTBEAT, "Node %d (id 0x%"MLFx64") joined my "
-                    "region\n", slot->ds_node_num, slot->ds_last_generation);
+               mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
+                    slot->ds_node_num, (long long)slot->ds_last_generation);
 
                /* first on the list generates a callback */
                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
@@ -725,6 +742,23 @@ fire_callbacks:
                              &o2hb_live_slots[slot->ds_node_num]);
 
                slot->ds_equal_samples = 0;
+
+               /* We want to be sure that all nodes agree on the
+                * number of milliseconds before a node will be
+                * considered dead. The self-fencing timeout is
+                * computed from this value, and a discrepancy might
+                * result in heartbeat calling a node dead when it
+                * hasn't self-fenced yet. */
+               slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
+               if (slot_dead_ms && slot_dead_ms != dead_ms) {
+                       /* TODO: Perhaps we can fail the region here. */
+                       mlog(ML_ERROR, "Node %d on device %s has a dead count "
+                            "of %u ms, but our count is %u ms.\n"
+                            "Please double check your configuration values "
+                            "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
+                            slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
+                            dead_ms);
+               }
                goto out;
        }
 
@@ -788,20 +822,24 @@ static int o2hb_highest_node(unsigned long *nodes,
        return highest;
 }
 
-static void o2hb_do_disk_heartbeat(struct o2hb_region *reg)
+static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
 {
        int i, ret, highest_node, change = 0;
        unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
        struct bio *write_bio;
        struct o2hb_bio_wait_ctxt write_wc;
 
-       if (o2nm_configured_node_map(configured_nodes, sizeof(configured_nodes)))
-               return;
+       ret = o2nm_configured_node_map(configured_nodes,
+                                      sizeof(configured_nodes));
+       if (ret) {
+               mlog_errno(ret);
+               return ret;
+       }
 
        highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
        if (highest_node >= O2NM_MAX_NODES) {
                mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n");
-               return;
+               return -EINVAL;
        }
 
        /* No sense in reading the slots of nodes that don't exist
@@ -811,7 +849,7 @@ static void o2hb_do_disk_heartbeat(struct o2hb_region *reg)
        ret = o2hb_read_slots(reg, highest_node + 1);
        if (ret < 0) {
                mlog_errno(ret);
-               return;
+               return ret;
        }
 
        /* With an up to date view of the slots, we can check that no
@@ -829,7 +867,7 @@ static void o2hb_do_disk_heartbeat(struct o2hb_region *reg)
        ret = o2hb_issue_node_write(reg, &write_bio, &write_wc);
        if (ret < 0) {
                mlog_errno(ret);
-               return;
+               return ret;
        }
 
        i = -1;
@@ -845,6 +883,15 @@ static void o2hb_do_disk_heartbeat(struct o2hb_region *reg)
         */
        o2hb_wait_on_io(reg, &write_wc);
        bio_put(write_bio);
+       if (write_wc.wc_error) {
+               /* Do not re-arm the write timeout on I/O error - we
+                * can't be sure that the new block ever made it to
+                * disk */
+               mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
+                    write_wc.wc_error, reg->hr_dev_name);
+               return write_wc.wc_error;
+       }
+
        o2hb_arm_write_timeout(reg);
 
        /* let the person who launched us know when things are steady */
@@ -852,6 +899,8 @@ static void o2hb_do_disk_heartbeat(struct o2hb_region *reg)
                if (atomic_dec_and_test(&reg->hr_steady_iterations))
                        wake_up(&o2hb_steady_queue);
        }
+
+       return 0;
 }
 
 /* Subtract b from a, storing the result in a. a *must* have a larger
@@ -911,7 +960,10 @@ static int o2hb_thread(void *data)
                 * likely to time itself out. */
                do_gettimeofday(&before_hb);
 
-               o2hb_do_disk_heartbeat(reg);
+               i = 0;
+               do {
+                       ret = o2hb_do_disk_heartbeat(reg);
+               } while (ret && ++i < 2);
 
                do_gettimeofday(&after_hb);
                elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
@@ -1354,7 +1406,7 @@ static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
                goto out;
        }
 
-       INIT_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout, reg);
+       INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
 
        /*
         * A node is considered live after it has beat LIVE_THRESHOLD
@@ -1395,6 +1447,15 @@ out:
        return ret;
 }
 
+static ssize_t o2hb_region_pid_read(struct o2hb_region *reg,
+                                      char *page)
+{
+       if (!reg->hr_task)
+               return 0;
+
+       return sprintf(page, "%u\n", reg->hr_task->pid);
+}
+
 struct o2hb_region_attribute {
        struct configfs_attribute attr;
        ssize_t (*show)(struct o2hb_region *, char *);
@@ -1433,11 +1494,19 @@ static struct o2hb_region_attribute o2hb_region_attr_dev = {
        .store  = o2hb_region_dev_write,
 };
 
+static struct o2hb_region_attribute o2hb_region_attr_pid = {
+       .attr   = { .ca_owner = THIS_MODULE,
+                   .ca_name = "pid",
+                   .ca_mode = S_IRUGO | S_IRUSR },
+       .show   = o2hb_region_pid_read,
+};
+
 static struct configfs_attribute *o2hb_region_attrs[] = {
        &o2hb_region_attr_block_bytes.attr,
        &o2hb_region_attr_start_block.attr,
        &o2hb_region_attr_blocks.attr,
        &o2hb_region_attr_dev.attr,
+       &o2hb_region_attr_pid.attr,
        NULL,
 };
 
@@ -1501,7 +1570,7 @@ static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *g
        struct o2hb_region *reg = NULL;
        struct config_item *ret = NULL;
 
-       reg = kcalloc(1, sizeof(struct o2hb_region), GFP_KERNEL);
+       reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
        if (reg == NULL)
                goto out; /* ENOMEM */
 
@@ -1627,7 +1696,7 @@ struct config_group *o2hb_alloc_hb_set(void)
        struct o2hb_heartbeat_group *hs = NULL;
        struct config_group *ret = NULL;
 
-       hs = kcalloc(1, sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
+       hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
        if (hs == NULL)
                goto out;