Merge to Fedora kernel-2.6.18-1.2224_FC5 patched with stable patch-2.6.18.1-vs2.0...
[linux-2.6.git] / drivers / md / dm-raid1.c
index 2cab46e..8cfc33b 100644 (file)
@@ -6,6 +6,7 @@
 
 #include "dm.h"
 #include "dm-bio-list.h"
+#include "dm-bio-record.h"
 #include "dm-io.h"
 #include "dm-log.h"
 #include "kcopyd.h"
 #include <linux/vmalloc.h>
 #include <linux/workqueue.h>
 
+#define DM_MSG_PREFIX "raid1"
+
 static struct workqueue_struct *_kmirrord_wq;
 static struct work_struct _kmirrord_work;
+DECLARE_WAIT_QUEUE_HEAD(recovery_stopped_event);
+
+static int dm_mirror_error_on_log_failure = 1;
 
 static inline void wake(void)
 {
@@ -81,10 +87,12 @@ struct region_hash {
        struct list_head *buckets;
 
        spinlock_t region_lock;
+       atomic_t recovery_in_flight;
        struct semaphore recovery_count;
        struct list_head clean_regions;
        struct list_head quiesced_regions;
        struct list_head recovered_regions;
+       struct list_head failed_recovered_regions;
 };
 
 enum {
@@ -111,7 +119,8 @@ struct region {
  * Mirror set structures.
  *---------------------------------------------------------------*/
 struct mirror {
-       atomic_t error_count;
+       atomic_t error_count;  /* Error counter to flag mirror failure */
+       struct mirror_set *ms;
        struct dm_dev *dev;
        sector_t offset;
 };
@@ -122,9 +131,10 @@ struct mirror_set {
        struct region_hash rh;
        struct kcopyd_client *kcopyd_client;
 
-       spinlock_t lock;        /* protects the next two lists */
+       spinlock_t lock;        /* protects the lists */
        struct bio_list reads;
        struct bio_list writes;
+       struct bio_list failures;
 
        /* recovery */
        region_t nr_regions;
@@ -133,6 +143,8 @@ struct mirror_set {
        struct mirror *default_mirror;  /* Default mirror */
 
        unsigned int nr_mirrors;
+       atomic_t read_count;      /* Read counter for read balancing */
+       struct mirror *read_mirror; /* Last mirror read. */
        struct mirror mirror[0];
 };
 
@@ -189,9 +201,11 @@ static int rh_init(struct region_hash *rh, struct mirror_set *ms,
 
        spin_lock_init(&rh->region_lock);
        sema_init(&rh->recovery_count, 0);
+       atomic_set(&rh->recovery_in_flight, 0);
        INIT_LIST_HEAD(&rh->clean_regions);
        INIT_LIST_HEAD(&rh->quiesced_regions);
        INIT_LIST_HEAD(&rh->recovered_regions);
+       INIT_LIST_HEAD(&rh->failed_recovered_regions);
 
        rh->region_pool = mempool_create_kmalloc_pool(MIN_REGIONS,
                                                      sizeof(struct region));
@@ -339,12 +353,24 @@ static void dispatch_bios(struct mirror_set *ms, struct bio_list *bio_list)
        }
 }
 
+static void complete_resync_work(struct region *reg, int success)
+{
+       struct region_hash *rh = reg->rh;
+
+       rh->log->type->set_region_sync(rh->log, reg->key, success);
+       if (atomic_dec_and_test(&rh->recovery_in_flight))
+               wake_up_all(&recovery_stopped_event);
+       dispatch_bios(rh->ms, &reg->delayed_bios);
+       up(&rh->recovery_count);
+}
+
 static void rh_update_states(struct region_hash *rh)
 {
        struct region *reg, *next;
 
        LIST_HEAD(clean);
        LIST_HEAD(recovered);
+       LIST_HEAD(failed_recovered);
 
        /*
         * Quickly grab the lists.
@@ -368,6 +394,15 @@ static void rh_update_states(struct region_hash *rh)
                list_for_each_entry (reg, &recovered, list)
                        list_del(&reg->hash_list);
        }
+
+       if (!list_empty(&rh->failed_recovered_regions)) {
+               list_splice(&rh->failed_recovered_regions, &failed_recovered);
+               INIT_LIST_HEAD(&rh->failed_recovered_regions);
+
+               list_for_each_entry (reg, &failed_recovered, list)
+                       list_del(&reg->hash_list);
+       }
+
        spin_unlock(&rh->region_lock);
        write_unlock_irq(&rh->hash_lock);
 
@@ -378,9 +413,12 @@ static void rh_update_states(struct region_hash *rh)
         */
        list_for_each_entry_safe (reg, next, &recovered, list) {
                rh->log->type->clear_region(rh->log, reg->key);
-               rh->log->type->complete_resync_work(rh->log, reg->key, 1);
-               dispatch_bios(rh->ms, &reg->delayed_bios);
-               up(&rh->recovery_count);
+               complete_resync_work(reg, 1);
+               mempool_free(reg, rh->region_pool);
+       }
+
+       list_for_each_entry_safe (reg, next, &failed_recovered, list) {
+               complete_resync_work(reg, 0);
                mempool_free(reg, rh->region_pool);
        }
 
@@ -490,11 +528,9 @@ static int __rh_recovery_prepare(struct region_hash *rh)
        /* Already quiesced ? */
        if (atomic_read(&reg->pending))
                list_del_init(&reg->list);
+       else
+               list_move(&reg->list, &rh->quiesced_regions);
 
-       else {
-               list_del_init(&reg->list);
-               list_add(&reg->list, &rh->quiesced_regions);
-       }
        spin_unlock_irq(&rh->region_lock);
 
        return 1;
@@ -502,11 +538,21 @@ static int __rh_recovery_prepare(struct region_hash *rh)
 
 static void rh_recovery_prepare(struct region_hash *rh)
 {
-       while (!down_trylock(&rh->recovery_count))
+       /* Extra reference to avoid race with rh_stop_recovery */
+       atomic_inc(&rh->recovery_in_flight);
+
+       while (!down_trylock(&rh->recovery_count)) {
+               atomic_inc(&rh->recovery_in_flight);
                if (__rh_recovery_prepare(rh) <= 0) {
+                       atomic_dec(&rh->recovery_in_flight);
                        up(&rh->recovery_count);
                        break;
                }
+       }
+
+       /* Drop the extra reference */
+       if (atomic_dec_and_test(&rh->recovery_in_flight))
+               wake_up_all(&recovery_stopped_event);
 }
 
 /*
@@ -527,21 +573,26 @@ static struct region *rh_recovery_start(struct region_hash *rh)
        return reg;
 }
 
-/* FIXME: success ignored for now */
 static void rh_recovery_end(struct region *reg, int success)
 {
        struct region_hash *rh = reg->rh;
 
        spin_lock_irq(&rh->region_lock);
-       list_add(&reg->list, &reg->rh->recovered_regions);
+       if (success ||
+           (rh->log->type->get_failure_response(rh->log) == DMLOG_IOERR_IGNORE))
+               list_add(&reg->list, &reg->rh->recovered_regions);
+       else {
+               reg->state = RH_NOSYNC;
+               list_add(&reg->list, &reg->rh->failed_recovered_regions);
+       }
        spin_unlock_irq(&rh->region_lock);
 
        wake();
 }
 
-static void rh_flush(struct region_hash *rh)
+static int rh_flush(struct region_hash *rh)
 {
-       rh->log->type->flush(rh->log);
+       return rh->log->type->flush(rh->log);
 }
 
 static void rh_delay(struct region_hash *rh, struct bio *bio)
@@ -573,24 +624,39 @@ static void rh_start_recovery(struct region_hash *rh)
        wake();
 }
 
+struct bio_map_info {
+       struct mirror *bmi_m;
+       struct dm_bio_details bmi_bd;
+};
+
+static mempool_t *bio_map_info_pool = NULL;
+
+static void *bio_map_info_alloc(unsigned int gfp_mask, void *pool_data){
+       return kmalloc(sizeof(struct bio_map_info), gfp_mask);
+}
+
+static void bio_map_info_free(void *element, void *pool_data){
+       kfree(element);
+}
+
 /*
  * Every mirror should look like this one.
  */
 #define DEFAULT_MIRROR 0
 
 /*
- * This is yucky.  We squirrel the mirror_set struct away inside
- * bi_next for write buffers.  This is safe since the bh
+ * This is yucky.  We squirrel the mirror struct away inside
+ * bi_next for read/write buffers.  This is safe since the bh
  * doesn't get submitted to the lower levels of block layer.
  */
-static struct mirror_set *bio_get_ms(struct bio *bio)
+static struct mirror *bio_get_m(struct bio *bio)
 {
-       return (struct mirror_set *) bio->bi_next;
+       return (struct mirror *) bio->bi_next;
 }
 
-static void bio_set_ms(struct bio *bio, struct mirror_set *ms)
+static void bio_set_m(struct bio *bio, struct mirror *m)
 {
-       bio->bi_next = (struct bio *) ms;
+       bio->bi_next = (struct bio *) m;
 }
 
 /*-----------------------------------------------------------------
@@ -600,13 +666,38 @@ static void bio_set_ms(struct bio *bio, struct mirror_set *ms)
  * are in the no-sync state.  We have to recover these by
  * recopying from the default mirror to all the others.
  *---------------------------------------------------------------*/
+static void fail_mirror(struct mirror *m);
 static void recovery_complete(int read_err, unsigned int write_err,
                              void *context)
 {
        struct region *reg = (struct region *) context;
+       struct mirror_set *ms = reg->rh->ms;
+       unsigned long write_error = write_err;
+       int m, bit = 0;
+
+       if (read_err) {
+               /* Read error means the failure of default mirror. */
+               DMERR("Unable to read from primary mirror during recovery");
+               fail_mirror(ms->default_mirror);
+       }
 
-       /* FIXME: better error handling */
-       rh_recovery_end(reg, read_err || write_err);
+       if (write_error) {
+               DMERR("Write error during recovery (error = %#lx)",
+                     write_error);
+               /*
+                * Bits correspond to devices (excluding default mirror).
+                * The default mirror cannot change during recovery.
+                */
+               for (m = 0; m < ms->nr_mirrors; m++) {
+                       if (&ms->mirror[m] == ms->default_mirror)
+                               continue;
+                       if (test_bit(bit, &write_error))
+                               fail_mirror(ms->mirror + m);
+                       bit++;
+               }
+       }
+
+       rh_recovery_end(reg, !(read_err || write_err));
 }
 
 static int recover(struct mirror_set *ms, struct region *reg)
@@ -645,7 +736,9 @@ static int recover(struct mirror_set *ms, struct region *reg)
        }
 
        /* hand to kcopyd */
-       set_bit(KCOPYD_IGNORE_ERROR, &flags);
+       if (ms->rh.log->type->get_failure_response(ms->rh.log) == DMLOG_IOERR_IGNORE)
+               set_bit(KCOPYD_IGNORE_ERROR, &flags);
+
        r = kcopyd_copy(ms->kcopyd_client, &from, ms->nr_mirrors - 1, to, flags,
                        recovery_complete, reg);
 
@@ -673,53 +766,212 @@ static void do_recovery(struct mirror_set *ms)
        }
 
        /*
-        * Update the in sync flag.
+        * Update the in sync flag if necessary.
+        * Raise an event when the mirror becomes in-sync.
+        *
+        * After recovery completes, the mirror becomes in_sync.
+        * Only an I/O failure can then take it back out-of-sync.
         */
-       if (!ms->in_sync &&
-           (log->type->get_sync_count(log) == ms->nr_regions)) {
-               /* the sync is complete */
-               dm_table_event(ms->ti->table);
-               ms->in_sync = 1;
-       }
+       if (log->type->get_sync_count(log) == ms->nr_regions) {
+               if (!ms->in_sync) {
+                       dm_table_event(ms->ti->table);
+                       ms->in_sync = 1;
+               }
+       } else if (ms->in_sync)
+               ms->in_sync = 0;
 }
 
 /*-----------------------------------------------------------------
  * Reads
  *---------------------------------------------------------------*/
-static struct mirror *choose_mirror(struct mirror_set *ms, sector_t sector)
+/* Switch to next dev, via round-robin, after MIN_READS reads */
+#define MIN_READS 128
+
+/* choose_mirror
+ * @ms: the mirror set
+ *
+ * This function is used for read balancing.
+ *
+ * Returns: chosen mirror, or NULL on failure
+ */
+static struct mirror *choose_mirror(struct mirror_set *ms)
+{
+       struct mirror *start_mirror = ms->read_mirror;
+
+       /*
+        * Perform MIN_READS on each working mirror then
+        * advance to the next one.  start_mirror stores
+        * the first we tried, so we know when we're done.
+        */
+       do {
+               if (likely(!atomic_read(&ms->read_mirror->error_count)) &&
+                   !atomic_dec_and_test(&ms->read_count))
+                       goto use_mirror;
+
+               atomic_set(&ms->read_count, MIN_READS);
+
+               if (ms->read_mirror-- == ms->mirror)
+                       ms->read_mirror += ms->nr_mirrors;
+       } while (ms->read_mirror != start_mirror);
+
+       /*
+        * We've rejected every mirror.
+        * Confirm the start_mirror can be used.
+        */
+       if (unlikely(atomic_read(&ms->read_mirror->error_count)))
+               return NULL;
+
+use_mirror:
+       return ms->read_mirror;
+}
+
+/* fail_mirror
+ * @m: mirror device to fail
+ *
+ * If the device is valid, mark it invalid.  Also,
+ * if this is the default mirror device (i.e. the primary
+ * device) and the mirror set is in-sync, choose an
+ * alternate primary device.
+ *
+ * This function cannot block.
+ */
+static void fail_mirror(struct mirror *m)
 {
-       /* FIXME: add read balancing */
-       return ms->default_mirror;
+       struct mirror_set *ms = m->ms;
+       struct mirror *new;
+
+       atomic_inc(&m->error_count);
+
+       if (atomic_read(&m->error_count) > 1)
+               return;
+
+       if (m != ms->default_mirror)
+               return;
+
+       /*
+        * If the default mirror fails, change it.
+        * In the case of cluster mirroring, the default
+        * is changed in rh_update_states.
+        */
+       if (!ms->in_sync) {
+               /*
+                * Can not switch primary.  Better to issue requests
+                * to same failing device than to risk returning
+                * corrupt data.
+                */
+               DMERR("Primary mirror device has failed while mirror is not in-sync");
+               DMERR("Unable to choose alternative primary device");
+               return;
+       }
+
+       for (new = ms->mirror; new < ms->mirror + ms->nr_mirrors; new++)
+               if (!atomic_read(&new->error_count)) {
+                       ms->default_mirror = new;
+                       break;
+               }
+
+       if (unlikely(new == ms->mirror + ms->nr_mirrors))
+               DMWARN("All sides of mirror have failed.");
+}
+
+static int default_ok(struct mirror *m)
+{
+       return !atomic_read(&m->ms->default_mirror->error_count);
+}
+
+static int mirror_available(struct mirror_set *ms, struct bio *bio)
+{
+       region_t region = bio_to_region(&ms->rh, bio);
+
+       if (ms->rh.log->type->in_sync(ms->rh.log, region, 0) > 0)
+               return choose_mirror(ms) ? 1 : 0;
+
+       return 0;
 }
 
 /*
  * remap a buffer to a particular mirror.
  */
-static void map_bio(struct mirror_set *ms, struct mirror *m, struct bio *bio)
+static sector_t map_sector(struct mirror *m, struct bio *bio)
+{
+       return m->offset + (bio->bi_sector - m->ms->ti->begin);
+}
+
+static void map_bio(struct mirror *m, struct bio *bio)
 {
        bio->bi_bdev = m->dev->bdev;
-       bio->bi_sector = m->offset + (bio->bi_sector - ms->ti->begin);
+       bio->bi_sector = map_sector(m, bio);
+}
+
+static void map_region(struct io_region *io, struct mirror *m,
+                      struct bio *bio)
+{
+       io->bdev = m->dev->bdev;
+       io->sector = map_sector(m, bio);
+       io->count = bio->bi_size >> 9;
+}
+
+/*-----------------------------------------------------------------
+ * Reads
+ *---------------------------------------------------------------*/
+static void read_callback(unsigned long error, void *context)
+{
+       struct bio *bio = (struct bio *)context;
+       struct mirror *m;
+
+       m = bio_get_m(bio);
+       bio_set_m(bio, NULL);
+
+       if (unlikely(error)) {
+               DMWARN("A read failure occurred on a mirror device.");
+               fail_mirror(m);
+               if (likely(default_ok(m)) || mirror_available(m->ms, bio)) {
+                       DMWARN("Trying different device.");
+                       queue_bio(m->ms, bio, bio_rw(bio));
+               } else {
+                       DMERR("No other device available, failing I/O.");
+                       bio_endio(bio, bio->bi_size, -EIO);
+               }
+       } else
+               bio_endio(bio, bio->bi_size, 0);
+}
+
+/* Asynchronous read. */
+static void read_async_bio(struct mirror *m, struct bio *bio)
+{
+       struct io_region io;
+
+       map_region(&io, m, bio);
+       bio_set_m(bio, m);
+       dm_io_async_bvec(1, &io, READ,
+                        bio->bi_io_vec + bio->bi_idx,
+                        read_callback, bio);
 }
 
 static void do_reads(struct mirror_set *ms, struct bio_list *reads)
 {
-       region_t region;
        struct bio *bio;
        struct mirror *m;
 
        while ((bio = bio_list_pop(reads))) {
-               region = bio_to_region(&ms->rh, bio);
-
                /*
                 * We can only read balance if the region is in sync.
                 */
-               if (rh_in_sync(&ms->rh, region, 0))
-                       m = choose_mirror(ms, bio->bi_sector);
-               else
+               if (likely(rh_in_sync(&ms->rh,
+                                     bio_to_region(&ms->rh, bio), 0)))
+                       m = choose_mirror(ms);
+               else {
                        m = ms->default_mirror;
 
-               map_bio(ms, m, bio);
-               generic_make_request(bio);
+                       /* If default has failed, we give up. */
+                       if (unlikely(m && atomic_read(&m->error_count)))
+                               m = NULL;
+               }
+
+               if (likely(m))
+                       read_async_bio(m, bio);
+               else
+                       bio_endio(bio, bio->bi_size, -EIO);
        }
 }
 
@@ -733,15 +985,70 @@ static void do_reads(struct mirror_set *ms, struct bio_list *reads)
  * RECOVERING: delay the io until recovery completes
  * NOSYNC:     increment pending, just write to the default mirror
  *---------------------------------------------------------------*/
-static void write_callback(unsigned long error, void *context)
+
+/* __bio_mark_nosync
+ * @ms
+ * @bio
+ * @done
+ * @error
+ *
+ * The bio was written on some mirror(s) but failed on other mirror(s).
+ * We can successfully endio the bio but should avoid the region being
+ * marked clean by setting the state RH_NOSYNC.
+ *
+ * This function is _not_ interrupt safe!
+ */
+static void __bio_mark_nosync(struct mirror_set *ms,
+                             struct bio *bio, unsigned int done, int error)
 {
-       unsigned int i;
-       int uptodate = 1;
+       unsigned long flags;
+       struct region_hash *rh = &ms->rh;
+       struct dirty_log *log = ms->rh.log;
+       struct region *reg;
+       region_t region = bio_to_region(rh, bio);
+       int recovering = 0;
+
+       ms->in_sync = 0;
+
+       /* We must inform the log that the sync count has changed. */
+       log->type->set_region_sync(log, region, 0);
+
+       read_lock(&rh->hash_lock);
+       reg = __rh_find(rh, region);
+       read_unlock(&rh->hash_lock);
+
+       /* region hash entry should exist because write was in-flight */
+       BUG_ON(!reg);
+       BUG_ON(!list_empty(&reg->list));
+
+       spin_lock_irqsave(&rh->region_lock, flags);
+       /*
+        * Possible cases:
+        *   1) RH_DIRTY
+        *   2) RH_NOSYNC: was dirty, other preceeding writes failed
+        *   3) RH_RECOVERING: flushing pending writes
+        * Either case, the region should have not been connected to list.
+        */
+       recovering = (reg->state == RH_RECOVERING);
+       reg->state = RH_NOSYNC;
+       BUG_ON(!list_empty(&reg->list));
+       spin_unlock_irqrestore(&rh->region_lock, flags);
+
+       bio_endio(bio, done, error);
+       if (recovering)
+               complete_resync_work(reg, 0);
+}
+
+static void write_callback(unsigned long error, void *context, int log_failure)
+{
+       unsigned int i, ret = 0;
        struct bio *bio = (struct bio *) context;
        struct mirror_set *ms;
+       int uptodate = 0;
+       int should_wake = 0;
 
-       ms = bio_get_ms(bio);
-       bio_set_ms(bio, NULL);
+       ms = (bio_get_m(bio))->ms;
+       bio_set_m(bio, NULL);
 
        /*
         * NOTE: We don't decrement the pending count here,
@@ -749,47 +1056,95 @@ static void write_callback(unsigned long error, void *context)
         * This way we handle both writes to SYNC and NOSYNC
         * regions with the same code.
         */
+       if (unlikely(error)) {
+               DMERR("Error during write occurred.");
 
-       if (error) {
                /*
-                * only error the io if all mirrors failed.
-                * FIXME: bogus
+                * If the log is intact, we can play around with trying
+                * to handle the failure.  Otherwise, we have to report
+                * the I/O as failed.
                 */
-               uptodate = 0;
-               for (i = 0; i < ms->nr_mirrors; i++)
-                       if (!test_bit(i, &error)) {
-                               uptodate = 1;
-                               break;
+               if (!log_failure) {
+                       for (i = 0; i < ms->nr_mirrors; i++) {
+                               if (test_bit(i, &error))
+                                       fail_mirror(ms->mirror + i);
+                               else
+                                       uptodate = 1;
                        }
+               }
+
+               if (likely(uptodate)) {
+                       /*
+                        * Need to raise event.  Since raising
+                        * events can block, we need to do it in
+                        * the main thread.
+                        */
+                       spin_lock(&ms->lock);
+                       if (!ms->failures.head)
+                               should_wake = 1;
+                       bio_list_add(&ms->failures, bio);
+                       spin_unlock(&ms->lock);
+                       if (should_wake)
+                               wake();
+                       return;
+               } else {
+                       DMERR("All replicated volumes dead, failing I/O");
+                       /* None of the writes succeeded, fail the I/O. */
+                       ret = -EIO;
+               }
        }
-       bio_endio(bio, bio->bi_size, 0);
+
+       bio_endio(bio, bio->bi_size, ret);
+}
+
+static void write_callback_good_log(unsigned long error, void *context)
+{
+       write_callback(error, context, 0);
 }
 
-static void do_write(struct mirror_set *ms, struct bio *bio)
+static void write_callback_bad_log(unsigned long error, void *context)
+{
+       write_callback(error, context, 1);
+}
+
+static void do_write(struct mirror_set *ms, struct bio *bio, int log_failure)
 {
        unsigned int i;
-       struct io_region io[KCOPYD_MAX_REGIONS+1];
+       struct io_region io[ms->nr_mirrors], *dest = io;
        struct mirror *m;
 
-       for (i = 0; i < ms->nr_mirrors; i++) {
-               m = ms->mirror + i;
-
-               io[i].bdev = m->dev->bdev;
-               io[i].sector = m->offset + (bio->bi_sector - ms->ti->begin);
-               io[i].count = bio->bi_size >> 9;
+       if (log_failure && dm_mirror_error_on_log_failure) {
+               bio_endio(bio, bio->bi_size, -EIO);
+               return;
        }
 
-       bio_set_ms(bio, ms);
-       dm_io_async_bvec(ms->nr_mirrors, io, WRITE,
-                        bio->bi_io_vec + bio->bi_idx,
-                        write_callback, bio);
+       for (i = 0, m = ms->mirror; i < ms->nr_mirrors; i++, m++)
+               map_region(dest++, m, bio);
+
+       /*
+        * We can use the default mirror here, because we
+        * only need it in order to retrieve the reference
+        * to the mirror set in write_callback().
+        */
+       bio_set_m(bio, ms->default_mirror);
+       if (log_failure)
+               dm_io_async_bvec(ms->nr_mirrors, io, WRITE,
+                                bio->bi_io_vec + bio->bi_idx,
+                                write_callback_bad_log, bio);
+       else
+               dm_io_async_bvec(ms->nr_mirrors, io, WRITE,
+                                bio->bi_io_vec + bio->bi_idx,
+                                write_callback_good_log, bio);
 }
 
 static void do_writes(struct mirror_set *ms, struct bio_list *writes)
 {
-       int state;
+       int state, r;
        struct bio *bio;
        struct bio_list sync, nosync, recover, *this_list = NULL;
+       struct bio_list requeue;
+       struct dirty_log *log = ms->rh.log;
+       region_t region;
 
        if (!writes->head)
                return;
@@ -800,9 +1155,18 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes)
        bio_list_init(&sync);
        bio_list_init(&nosync);
        bio_list_init(&recover);
+       bio_list_init(&requeue);
 
        while ((bio = bio_list_pop(writes))) {
-               state = rh_state(&ms->rh, bio_to_region(&ms->rh, bio), 1);
+               region = bio_to_region(&ms->rh, bio);
+
+               if (log->type->is_remote_recovering &&
+                   log->type->is_remote_recovering(log, region)) {
+                       bio_list_add(&requeue, bio);
+                       continue;
+               }
+
+               state = rh_state(&ms->rh, region, 1);
                switch (state) {
                case RH_CLEAN:
                case RH_DIRTY:
@@ -821,6 +1185,14 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes)
                bio_list_add(this_list, bio);
        }
 
+       /*
+        * Add bios that are delayed due to remote recovery
+        * back on to the write queue
+        */
+       spin_lock_irq(&ms->lock);
+       bio_list_merge(&ms->writes, &requeue);
+       spin_unlock_irq(&ms->lock);
+
        /*
         * Increment the pending counts for any regions that will
         * be written to (writes to recover regions are going to
@@ -828,54 +1200,86 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes)
         */
        rh_inc_pending(&ms->rh, &sync);
        rh_inc_pending(&ms->rh, &nosync);
-       rh_flush(&ms->rh);
+
+       r = rh_flush(&ms->rh);
 
        /*
         * Dispatch io.
         */
        while ((bio = bio_list_pop(&sync)))
-               do_write(ms, bio);
+               do_write(ms, bio, r ? 1 : 0);
 
        while ((bio = bio_list_pop(&recover)))
                rh_delay(&ms->rh, bio);
 
        while ((bio = bio_list_pop(&nosync))) {
-               map_bio(ms, ms->default_mirror, bio);
+               map_bio(ms->default_mirror, bio);
                generic_make_request(bio);
        }
 }
 
+static void do_failures(struct mirror_set *ms, struct bio_list *failures)
+{
+       struct bio *bio;
+       struct dirty_log *log = ms->rh.log;
+
+       if (!failures->head)
+               return;
+
+       if (log->type->get_failure_response(log) == DMLOG_IOERR_BLOCK)
+               dm_table_event(ms->ti->table);
+
+       while ((bio = bio_list_pop(failures)))
+               __bio_mark_nosync(ms, bio, bio->bi_size, 0);
+}
+
 /*-----------------------------------------------------------------
  * kmirrord
  *---------------------------------------------------------------*/
 static LIST_HEAD(_mirror_sets);
 static DECLARE_RWSEM(_mirror_sets_lock);
 
-static void do_mirror(struct mirror_set *ms)
+static int do_mirror(struct mirror_set *ms)
 {
-       struct bio_list reads, writes;
+       struct bio_list reads, writes, failures;
 
-       spin_lock(&ms->lock);
+       spin_lock_irq(&ms->lock);
        reads = ms->reads;
        writes = ms->writes;
+       failures = ms->failures;
        bio_list_init(&ms->reads);
        bio_list_init(&ms->writes);
-       spin_unlock(&ms->lock);
+       bio_list_init(&ms->failures);
+       spin_unlock_irq(&ms->lock);
 
        rh_update_states(&ms->rh);
        do_recovery(ms);
        do_reads(ms, &reads);
        do_writes(ms, &writes);
+       do_failures(ms, &failures);
+
+       return (ms->writes.head) ? 1 : 0;
 }
 
-static void do_work(void *ignored)
+static int _do_work(void)
 {
+       int more_work = 0;
        struct mirror_set *ms;
 
        down_read(&_mirror_sets_lock);
        list_for_each_entry (ms, &_mirror_sets, list)
-               do_mirror(ms);
+               more_work += do_mirror(ms);
        up_read(&_mirror_sets_lock);
+
+       return more_work;
+}
+
+static void do_work(void *ignored)
+{
+       while (_do_work()) {
+               set_current_state(TASK_INTERRUPTIBLE);
+               schedule_timeout(HZ/5);
+       }
 }
 
 /*-----------------------------------------------------------------
@@ -896,7 +1300,7 @@ static struct mirror_set *alloc_context(unsigned int nr_mirrors,
 
        ms = kmalloc(len, GFP_KERNEL);
        if (!ms) {
-               ti->error = "dm-mirror: Cannot allocate mirror context";
+               ti->error = "Cannot allocate mirror context";
                return NULL;
        }
 
@@ -907,14 +1311,19 @@ static struct mirror_set *alloc_context(unsigned int nr_mirrors,
        ms->nr_mirrors = nr_mirrors;
        ms->nr_regions = dm_sector_div_up(ti->len, region_size);
        ms->in_sync = 0;
+       ms->read_mirror = &ms->mirror[DEFAULT_MIRROR];
        ms->default_mirror = &ms->mirror[DEFAULT_MIRROR];
 
        if (rh_init(&ms->rh, ms, dl, region_size, ms->nr_regions)) {
-               ti->error = "dm-mirror: Error creating dirty region hash";
+               ti->error = "Error creating dirty region hash";
                kfree(ms);
                return NULL;
        }
 
+       atomic_set(&ms->read_count, MIN_READS);
+
+       bio_list_init(&ms->failures);
+
        return ms;
 }
 
@@ -940,18 +1349,20 @@ static int get_mirror(struct mirror_set *ms, struct dm_target *ti,
        unsigned long long offset;
 
        if (sscanf(argv[1], "%llu", &offset) != 1) {
-               ti->error = "dm-mirror: Invalid offset";
+               ti->error = "Invalid offset";
                return -EINVAL;
        }
 
        if (dm_get_device(ti, argv[0], offset, ti->len,
                          dm_table_get_mode(ti->table),
                          &ms->mirror[mirror].dev)) {
-               ti->error = "dm-mirror: Device lookup failure";
+               ti->error = "Device lookup failure";
                return -ENXIO;
        }
 
        ms->mirror[mirror].offset = offset;
+       atomic_set(&(ms->mirror[mirror].error_count), 0);
+       ms->mirror[mirror].ms = ms;
 
        return 0;
 }
@@ -984,30 +1395,30 @@ static struct dirty_log *create_dirty_log(struct dm_target *ti,
        struct dirty_log *dl;
 
        if (argc < 2) {
-               ti->error = "dm-mirror: Insufficient mirror log arguments";
+               ti->error = "Insufficient mirror log arguments";
                return NULL;
        }
 
        if (sscanf(argv[1], "%u", &param_count) != 1) {
-               ti->error = "dm-mirror: Invalid mirror log argument count";
+               ti->error = "Invalid mirror log argument count";
                return NULL;
        }
 
        *args_used = 2 + param_count;
 
        if (argc < *args_used) {
-               ti->error = "dm-mirror: Insufficient mirror log arguments";
+               ti->error = "Insufficient mirror log arguments";
                return NULL;
        }
 
        dl = dm_create_dirty_log(argv[0], ti, param_count, argv + 2);
        if (!dl) {
-               ti->error = "dm-mirror: Error creating mirror dirty log";
+               ti->error = "Error creating mirror dirty log";
                return NULL;
        }
 
        if (!_check_region_size(ti, dl->type->get_region_size(dl))) {
-               ti->error = "dm-mirror: Invalid region size";
+               ti->error = "Invalid region size";
                dm_destroy_dirty_log(dl);
                return NULL;
        }
@@ -1041,7 +1452,7 @@ static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
 
        if (!argc || sscanf(argv[0], "%u", &nr_mirrors) != 1 ||
            nr_mirrors < 2 || nr_mirrors > KCOPYD_MAX_REGIONS + 1) {
-               ti->error = "dm-mirror: Invalid number of mirrors";
+               ti->error = "Invalid number of mirrors";
                dm_destroy_dirty_log(dl);
                return -EINVAL;
        }
@@ -1049,7 +1460,7 @@ static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
        argv++, argc--;
 
        if (argc != nr_mirrors * 2) {
-               ti->error = "dm-mirror: Wrong number of mirror arguments";
+               ti->error = "Wrong number of mirror arguments";
                dm_destroy_dirty_log(dl);
                return -EINVAL;
        }
@@ -1095,14 +1506,15 @@ static void mirror_dtr(struct dm_target *ti)
 
 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw)
 {
+       unsigned long flags;
        int should_wake = 0;
        struct bio_list *bl;
 
        bl = (rw == WRITE) ? &ms->writes : &ms->reads;
-       spin_lock(&ms->lock);
+       spin_lock_irqsave(&ms->lock, flags);
        should_wake = !(bl->head);
        bio_list_add(bl, bio);
-       spin_unlock(&ms->lock);
+       spin_unlock_irqrestore(&ms->lock, flags);
 
        if (should_wake)
                wake();
@@ -1117,42 +1529,64 @@ static int mirror_map(struct dm_target *ti, struct bio *bio,
        int r, rw = bio_rw(bio);
        struct mirror *m;
        struct mirror_set *ms = ti->private;
-
-       map_context->ll = bio_to_region(&ms->rh, bio);
+       struct bio_map_info *bmi = NULL;
+       struct dm_bio_details *bd = NULL;
 
        if (rw == WRITE) {
+               /* Save region for mirror_end_io() handler */
+               map_context->ll = bio_to_region(&ms->rh, bio);
                queue_bio(ms, bio, rw);
                return 0;
        }
 
+       /* All about the reads now */
+
        r = ms->rh.log->type->in_sync(ms->rh.log,
                                      bio_to_region(&ms->rh, bio), 0);
        if (r < 0 && r != -EWOULDBLOCK)
                return r;
 
-       if (r == -EWOULDBLOCK)  /* FIXME: ugly */
+       if (r == -EWOULDBLOCK)
                r = 0;
 
-       /*
-        * We don't want to fast track a recovery just for a read
-        * ahead.  So we just let it silently fail.
-        * FIXME: get rid of this.
-        */
-       if (!r && rw == READA)
-               return -EIO;
+       if (likely(r)) {
+               /*
+                * Optimize reads by avoiding to hand them to daemon.
+                *
+                * In case they fail, queue them for another shot
+                * in the mirror_end_io() function.
+                */
+               m = choose_mirror(ms);
+               if (likely(m)) {
+                       bmi = mempool_alloc(bio_map_info_pool, GFP_NOIO);
+
+                       if (likely(bmi)) {
+                               /* without this, a read is not retryable */
+                               bd = &bmi->bmi_bd;
+                               dm_bio_record(bd, bio);
+                               map_context->ptr = bmi;
+                               bmi->bmi_m = m;
+                       } else {
+                               /* we could fail now, but we can at least  **
+                               ** give it a shot.  The bd is only used to **
+                               ** retry in the event of a failure anyway. **
+                               ** If we fail, we can fail the I/O then.   */
+                               map_context->ptr = NULL;
+                       }
+
+                       map_bio(m, bio);
+                       return 1; /* Mapped -> queue request. */
+               } else
+                       return -EIO;
+       } else {
+               /* Either not clean, or -EWOULDBLOCK */
+               if (rw == READA)
+                       return -EWOULDBLOCK;
 
-       if (!r) {
-               /* Pass this io over to the daemon */
                queue_bio(ms, bio, rw);
-               return 0;
        }
 
-       m = choose_mirror(ms, bio->bi_sector);
-       if (!m)
-               return -EIO;
-
-       map_bio(ms, m, bio);
-       return 1;
+       return 0;
 }
 
 static int mirror_end_io(struct dm_target *ti, struct bio *bio,
@@ -1160,15 +1594,71 @@ static int mirror_end_io(struct dm_target *ti, struct bio *bio,
 {
        int rw = bio_rw(bio);
        struct mirror_set *ms = (struct mirror_set *) ti->private;
-       region_t region = map_context->ll;
+       struct mirror *m = NULL;
+       struct dm_bio_details *bd = NULL;
 
        /*
         * We need to dec pending if this was a write.
         */
-       if (rw == WRITE)
-               rh_dec(&ms->rh, region);
+       if (rw == WRITE) {
+               rh_dec(&ms->rh, map_context->ll);
+               return error;
+       }
 
-       return 0;
+       if (error == -EOPNOTSUPP)
+               goto out;
+
+       if ((error == -EWOULDBLOCK) && bio_rw_ahead(bio))
+               goto out;
+
+       if (unlikely(error)) {
+               DMERR("A read failure occurred on a mirror device.");
+               if (!map_context->ptr) {
+                       /*
+                        * There wasn't enough memory to record necessary
+                        * information for a retry or there was no other
+                        * mirror in-sync.
+                        */
+                       DMERR("Unable to retry read.");
+                       return -EIO;
+               }
+               m = ((struct bio_map_info *)map_context->ptr)->bmi_m;
+               fail_mirror(m); /* Flag error on mirror. */
+
+               /*
+                * A failed read needs to get queued
+                * to the daemon for another shot to
+                * one (if any) intact mirrors.
+                */
+               if (default_ok(m) || mirror_available(ms, bio)) {
+                       bd = &(((struct bio_map_info *)map_context->ptr)->bmi_bd
+                               );
+
+                       DMWARN("Trying different device.");
+                       dm_bio_restore(bd, bio);
+                       mempool_free(map_context->ptr, bio_map_info_pool);
+                       map_context->ptr = NULL;
+                       queue_bio(ms, bio, rw);
+                       return 1; /* We want another shot on the bio. */
+               }
+               DMERR("All replicated volumes dead, failing I/O");
+       }
+
+out:
+       if (map_context->ptr)
+               mempool_free(map_context->ptr, bio_map_info_pool);
+
+       return error;
+}
+
+static void mirror_presuspend(struct dm_target *ti)
+{
+       struct mirror_set *ms = (struct mirror_set *) ti->private;
+       struct dirty_log *log = ms->rh.log;
+
+       if (log->type->presuspend && log->type->presuspend(log))
+               /* FIXME: need better error handling */
+               DMWARN("log presuspend failed");
 }
 
 static void mirror_postsuspend(struct dm_target *ti)
@@ -1177,9 +1667,14 @@ static void mirror_postsuspend(struct dm_target *ti)
        struct dirty_log *log = ms->rh.log;
 
        rh_stop_recovery(&ms->rh);
-       if (log->type->suspend && log->type->suspend(log))
+
+       /* Wait for all I/O we generated to complete */
+       wait_event(recovery_stopped_event,
+                  !atomic_read(&ms->rh.recovery_in_flight));
+
+       if (log->type->postsuspend && log->type->postsuspend(log))
                /* FIXME: need better error handling */
-               DMWARN("log suspend failed");
+               DMWARN("log postsuspend failed");
 }
 
 static void mirror_resume(struct dm_target *ti)
@@ -1195,24 +1690,28 @@ static void mirror_resume(struct dm_target *ti)
 static int mirror_status(struct dm_target *ti, status_type_t type,
                         char *result, unsigned int maxlen)
 {
-       unsigned int m, sz;
+       unsigned int m, sz = 0;
        struct mirror_set *ms = (struct mirror_set *) ti->private;
-
-       sz = ms->rh.log->type->status(ms->rh.log, type, result, maxlen);
+       char buffer[ms->nr_mirrors + 1];
 
        switch (type) {
        case STATUSTYPE_INFO:
                DMEMIT("%d ", ms->nr_mirrors);
-               for (m = 0; m < ms->nr_mirrors; m++)
+               for (m = 0; m < ms->nr_mirrors; m++) {
                        DMEMIT("%s ", ms->mirror[m].dev->name);
+                       buffer[m] = atomic_read(&(ms->mirror[m].error_count)) ?
+                               'D' : 'A';
+               }
+               buffer[m] = '\0';
 
-               DMEMIT("%llu/%llu",
-                       (unsigned long long)ms->rh.log->type->
-                               get_sync_count(ms->rh.log),
-                       (unsigned long long)ms->nr_regions);
+               DMEMIT("%llu/%llu 1 %s ",
+                      ms->rh.log->type->get_sync_count(ms->rh.log),
+                      ms->nr_regions, buffer);
+               ms->rh.log->type->status(ms->rh.log, type, result+sz, maxlen-sz);
                break;
 
        case STATUSTYPE_TABLE:
+               sz = ms->rh.log->type->status(ms->rh.log, type, result, maxlen);
                DMEMIT("%d ", ms->nr_mirrors);
                for (m = 0; m < ms->nr_mirrors; m++)
                        DMEMIT("%s %llu ", ms->mirror[m].dev->name,
@@ -1224,12 +1723,13 @@ static int mirror_status(struct dm_target *ti, status_type_t type,
 
 static struct target_type mirror_target = {
        .name    = "mirror",
-       .version = {1, 0, 1},
+       .version = {1, 2, 0},
        .module  = THIS_MODULE,
        .ctr     = mirror_ctr,
        .dtr     = mirror_dtr,
        .map     = mirror_map,
        .end_io  = mirror_end_io,
+       .presuspend = mirror_presuspend,
        .postsuspend = mirror_postsuspend,
        .resume  = mirror_resume,
        .status  = mirror_status,
@@ -1239,6 +1739,11 @@ static int __init dm_mirror_init(void)
 {
        int r;
 
+       bio_map_info_pool = mempool_create(100, bio_map_info_alloc,
+                                          bio_map_info_free, NULL);
+       if (!bio_map_info_pool)
+               return -ENOMEM;
+
        r = dm_dirty_log_init();
        if (r)
                return r;
@@ -1247,7 +1752,7 @@ static int __init dm_mirror_init(void)
        if (!_kmirrord_wq) {
                DMERR("couldn't start kmirrord");
                dm_dirty_log_exit();
-               return r;
+               return -ENOMEM;
        }
        INIT_WORK(&_kmirrord_work, do_work, NULL);
 
@@ -1257,6 +1762,15 @@ static int __init dm_mirror_init(void)
                      mirror_target.name);
                dm_dirty_log_exit();
                destroy_workqueue(_kmirrord_wq);
+       } else if (!dm_mirror_error_on_log_failure) {
+               DMWARN("Warning: dm_mirror_error_on_log_failure = 0");
+               DMWARN("In this mode, the following fault sequence could cause corruption:");
+               DMWARN("  1) Log device failure");
+               DMWARN("  2) Write I/O issued");
+               DMWARN("  3) Machine failure");
+               DMWARN("  4) Log device restored");
+               DMWARN("  5) Machine reboots");
+               DMWARN("If this happens, you must resync your mirror.");
        }
 
        return r;
@@ -1278,6 +1792,8 @@ static void __exit dm_mirror_exit(void)
 module_init(dm_mirror_init);
 module_exit(dm_mirror_exit);
 
+module_param(dm_mirror_error_on_log_failure, int, 1);
+MODULE_PARM_DESC(dm_mirror_error_on_log_failure, "Set to '0' if you want writes to succeed on log device failure");
 MODULE_DESCRIPTION(DM_NAME " mirror target");
 MODULE_AUTHOR("Joe Thornber");
 MODULE_LICENSE("GPL");