linux 2.6.16.38 w/ vs2.0.3-rc1
[linux-2.6.git] / drivers / md / dm-raid1.c
index 8cfc33b..0242cc3 100644 (file)
@@ -6,7 +6,6 @@
 
 #include "dm.h"
 #include "dm-bio-list.h"
-#include "dm-bio-record.h"
 #include "dm-io.h"
 #include "dm-log.h"
 #include "kcopyd.h"
 #include <linux/vmalloc.h>
 #include <linux/workqueue.h>
 
-#define DM_MSG_PREFIX "raid1"
-
 static struct workqueue_struct *_kmirrord_wq;
 static struct work_struct _kmirrord_work;
-DECLARE_WAIT_QUEUE_HEAD(recovery_stopped_event);
-
-static int dm_mirror_error_on_log_failure = 1;
 
 static inline void wake(void)
 {
@@ -87,12 +81,10 @@ struct region_hash {
        struct list_head *buckets;
 
        spinlock_t region_lock;
-       atomic_t recovery_in_flight;
        struct semaphore recovery_count;
        struct list_head clean_regions;
        struct list_head quiesced_regions;
        struct list_head recovered_regions;
-       struct list_head failed_recovered_regions;
 };
 
 enum {
@@ -119,8 +111,7 @@ struct region {
  * Mirror set structures.
  *---------------------------------------------------------------*/
 struct mirror {
-       atomic_t error_count;  /* Error counter to flag mirror failure */
-       struct mirror_set *ms;
+       atomic_t error_count;
        struct dm_dev *dev;
        sector_t offset;
 };
@@ -131,10 +122,9 @@ struct mirror_set {
        struct region_hash rh;
        struct kcopyd_client *kcopyd_client;
 
-       spinlock_t lock;        /* protects the lists */
+       spinlock_t lock;        /* protects the next two lists */
        struct bio_list reads;
        struct bio_list writes;
-       struct bio_list failures;
 
        /* recovery */
        region_t nr_regions;
@@ -143,8 +133,6 @@ struct mirror_set {
        struct mirror *default_mirror;  /* Default mirror */
 
        unsigned int nr_mirrors;
-       atomic_t read_count;      /* Read counter for read balancing */
-       struct mirror *read_mirror; /* Last mirror read. */
        struct mirror mirror[0];
 };
 
@@ -164,6 +152,16 @@ static inline sector_t region_to_sector(struct region_hash *rh, region_t region)
 /* FIXME move this */
 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw);
 
+static void *region_alloc(gfp_t gfp_mask, void *pool_data)
+{
+       return kmalloc(sizeof(struct region), gfp_mask);
+}
+
+static void region_free(void *element, void *pool_data)
+{
+       kfree(element);
+}
+
 #define MIN_REGIONS 64
 #define MAX_RECOVERY 1
 static int rh_init(struct region_hash *rh, struct mirror_set *ms,
@@ -201,14 +199,12 @@ static int rh_init(struct region_hash *rh, struct mirror_set *ms,
 
        spin_lock_init(&rh->region_lock);
        sema_init(&rh->recovery_count, 0);
-       atomic_set(&rh->recovery_in_flight, 0);
        INIT_LIST_HEAD(&rh->clean_regions);
        INIT_LIST_HEAD(&rh->quiesced_regions);
        INIT_LIST_HEAD(&rh->recovered_regions);
-       INIT_LIST_HEAD(&rh->failed_recovered_regions);
 
-       rh->region_pool = mempool_create_kmalloc_pool(MIN_REGIONS,
-                                                     sizeof(struct region));
+       rh->region_pool = mempool_create(MIN_REGIONS, region_alloc,
+                                        region_free, NULL);
        if (!rh->region_pool) {
                vfree(rh->buckets);
                rh->buckets = NULL;
@@ -353,24 +349,12 @@ static void dispatch_bios(struct mirror_set *ms, struct bio_list *bio_list)
        }
 }
 
-static void complete_resync_work(struct region *reg, int success)
-{
-       struct region_hash *rh = reg->rh;
-
-       rh->log->type->set_region_sync(rh->log, reg->key, success);
-       if (atomic_dec_and_test(&rh->recovery_in_flight))
-               wake_up_all(&recovery_stopped_event);
-       dispatch_bios(rh->ms, &reg->delayed_bios);
-       up(&rh->recovery_count);
-}
-
 static void rh_update_states(struct region_hash *rh)
 {
        struct region *reg, *next;
 
        LIST_HEAD(clean);
        LIST_HEAD(recovered);
-       LIST_HEAD(failed_recovered);
 
        /*
         * Quickly grab the lists.
@@ -394,15 +378,6 @@ static void rh_update_states(struct region_hash *rh)
                list_for_each_entry (reg, &recovered, list)
                        list_del(&reg->hash_list);
        }
-
-       if (!list_empty(&rh->failed_recovered_regions)) {
-               list_splice(&rh->failed_recovered_regions, &failed_recovered);
-               INIT_LIST_HEAD(&rh->failed_recovered_regions);
-
-               list_for_each_entry (reg, &failed_recovered, list)
-                       list_del(&reg->hash_list);
-       }
-
        spin_unlock(&rh->region_lock);
        write_unlock_irq(&rh->hash_lock);
 
@@ -413,12 +388,9 @@ static void rh_update_states(struct region_hash *rh)
         */
        list_for_each_entry_safe (reg, next, &recovered, list) {
                rh->log->type->clear_region(rh->log, reg->key);
-               complete_resync_work(reg, 1);
-               mempool_free(reg, rh->region_pool);
-       }
-
-       list_for_each_entry_safe (reg, next, &failed_recovered, list) {
-               complete_resync_work(reg, 0);
+               rh->log->type->complete_resync_work(rh->log, reg->key, 1);
+               dispatch_bios(rh->ms, &reg->delayed_bios);
+               up(&rh->recovery_count);
                mempool_free(reg, rh->region_pool);
        }
 
@@ -472,21 +444,9 @@ static void rh_dec(struct region_hash *rh, region_t region)
 
        spin_lock_irqsave(&rh->region_lock, flags);
        if (atomic_dec_and_test(&reg->pending)) {
-               /*
-                * There is no pending I/O for this region.
-                * We can move the region to corresponding list for next action.
-                * At this point, the region is not yet connected to any list.
-                *
-                * If the state is RH_NOSYNC, the region should be kept off
-                * from clean list.
-                * The hash entry for RH_NOSYNC will remain in memory
-                * until the region is recovered or the map is reloaded.
-                */
-
-               /* do nothing for RH_NOSYNC */
                if (reg->state == RH_RECOVERING) {
                        list_add_tail(&reg->list, &rh->quiesced_regions);
-               } else if (reg->state == RH_DIRTY) {
+               } else {
                        reg->state = RH_CLEAN;
                        list_add(&reg->list, &rh->clean_regions);
                }
@@ -528,9 +488,11 @@ static int __rh_recovery_prepare(struct region_hash *rh)
        /* Already quiesced ? */
        if (atomic_read(&reg->pending))
                list_del_init(&reg->list);
-       else
-               list_move(&reg->list, &rh->quiesced_regions);
 
+       else {
+               list_del_init(&reg->list);
+               list_add(&reg->list, &rh->quiesced_regions);
+       }
        spin_unlock_irq(&rh->region_lock);
 
        return 1;
@@ -538,21 +500,11 @@ static int __rh_recovery_prepare(struct region_hash *rh)
 
 static void rh_recovery_prepare(struct region_hash *rh)
 {
-       /* Extra reference to avoid race with rh_stop_recovery */
-       atomic_inc(&rh->recovery_in_flight);
-
-       while (!down_trylock(&rh->recovery_count)) {
-               atomic_inc(&rh->recovery_in_flight);
+       while (!down_trylock(&rh->recovery_count))
                if (__rh_recovery_prepare(rh) <= 0) {
-                       atomic_dec(&rh->recovery_in_flight);
                        up(&rh->recovery_count);
                        break;
                }
-       }
-
-       /* Drop the extra reference */
-       if (atomic_dec_and_test(&rh->recovery_in_flight))
-               wake_up_all(&recovery_stopped_event);
 }
 
 /*
@@ -573,26 +525,21 @@ static struct region *rh_recovery_start(struct region_hash *rh)
        return reg;
 }
 
+/* FIXME: success ignored for now */
 static void rh_recovery_end(struct region *reg, int success)
 {
        struct region_hash *rh = reg->rh;
 
        spin_lock_irq(&rh->region_lock);
-       if (success ||
-           (rh->log->type->get_failure_response(rh->log) == DMLOG_IOERR_IGNORE))
-               list_add(&reg->list, &reg->rh->recovered_regions);
-       else {
-               reg->state = RH_NOSYNC;
-               list_add(&reg->list, &reg->rh->failed_recovered_regions);
-       }
+       list_add(&reg->list, &reg->rh->recovered_regions);
        spin_unlock_irq(&rh->region_lock);
 
        wake();
 }
 
-static int rh_flush(struct region_hash *rh)
+static void rh_flush(struct region_hash *rh)
 {
-       return rh->log->type->flush(rh->log);
+       rh->log->type->flush(rh->log);
 }
 
 static void rh_delay(struct region_hash *rh, struct bio *bio)
@@ -624,39 +571,24 @@ static void rh_start_recovery(struct region_hash *rh)
        wake();
 }
 
-struct bio_map_info {
-       struct mirror *bmi_m;
-       struct dm_bio_details bmi_bd;
-};
-
-static mempool_t *bio_map_info_pool = NULL;
-
-static void *bio_map_info_alloc(unsigned int gfp_mask, void *pool_data){
-       return kmalloc(sizeof(struct bio_map_info), gfp_mask);
-}
-
-static void bio_map_info_free(void *element, void *pool_data){
-       kfree(element);
-}
-
 /*
  * Every mirror should look like this one.
  */
 #define DEFAULT_MIRROR 0
 
 /*
- * This is yucky.  We squirrel the mirror struct away inside
- * bi_next for read/write buffers.  This is safe since the bh
+ * This is yucky.  We squirrel the mirror_set struct away inside
+ * bi_next for write buffers.  This is safe since the bh
  * doesn't get submitted to the lower levels of block layer.
  */
-static struct mirror *bio_get_m(struct bio *bio)
+static struct mirror_set *bio_get_ms(struct bio *bio)
 {
-       return (struct mirror *) bio->bi_next;
+       return (struct mirror_set *) bio->bi_next;
 }
 
-static void bio_set_m(struct bio *bio, struct mirror *m)
+static void bio_set_ms(struct bio *bio, struct mirror_set *ms)
 {
-       bio->bi_next = (struct bio *) m;
+       bio->bi_next = (struct bio *) ms;
 }
 
 /*-----------------------------------------------------------------
@@ -666,38 +598,13 @@ static void bio_set_m(struct bio *bio, struct mirror *m)
  * are in the no-sync state.  We have to recover these by
  * recopying from the default mirror to all the others.
  *---------------------------------------------------------------*/
-static void fail_mirror(struct mirror *m);
 static void recovery_complete(int read_err, unsigned int write_err,
                              void *context)
 {
        struct region *reg = (struct region *) context;
-       struct mirror_set *ms = reg->rh->ms;
-       unsigned long write_error = write_err;
-       int m, bit = 0;
-
-       if (read_err) {
-               /* Read error means the failure of default mirror. */
-               DMERR("Unable to read from primary mirror during recovery");
-               fail_mirror(ms->default_mirror);
-       }
 
-       if (write_error) {
-               DMERR("Write error during recovery (error = %#lx)",
-                     write_error);
-               /*
-                * Bits correspond to devices (excluding default mirror).
-                * The default mirror cannot change during recovery.
-                */
-               for (m = 0; m < ms->nr_mirrors; m++) {
-                       if (&ms->mirror[m] == ms->default_mirror)
-                               continue;
-                       if (test_bit(bit, &write_error))
-                               fail_mirror(ms->mirror + m);
-                       bit++;
-               }
-       }
-
-       rh_recovery_end(reg, !(read_err || write_err));
+       /* FIXME: better error handling */
+       rh_recovery_end(reg, read_err || write_err);
 }
 
 static int recover(struct mirror_set *ms, struct region *reg)
@@ -736,9 +643,7 @@ static int recover(struct mirror_set *ms, struct region *reg)
        }
 
        /* hand to kcopyd */
-       if (ms->rh.log->type->get_failure_response(ms->rh.log) == DMLOG_IOERR_IGNORE)
-               set_bit(KCOPYD_IGNORE_ERROR, &flags);
-
+       set_bit(KCOPYD_IGNORE_ERROR, &flags);
        r = kcopyd_copy(ms->kcopyd_client, &from, ms->nr_mirrors - 1, to, flags,
                        recovery_complete, reg);
 
@@ -766,212 +671,53 @@ static void do_recovery(struct mirror_set *ms)
        }
 
        /*
-        * Update the in sync flag if necessary.
-        * Raise an event when the mirror becomes in-sync.
-        *
-        * After recovery completes, the mirror becomes in_sync.
-        * Only an I/O failure can then take it back out-of-sync.
+        * Update the in sync flag.
         */
-       if (log->type->get_sync_count(log) == ms->nr_regions) {
-               if (!ms->in_sync) {
-                       dm_table_event(ms->ti->table);
-                       ms->in_sync = 1;
-               }
-       } else if (ms->in_sync)
-               ms->in_sync = 0;
+       if (!ms->in_sync &&
+           (log->type->get_sync_count(log) == ms->nr_regions)) {
+               /* the sync is complete */
+               dm_table_event(ms->ti->table);
+               ms->in_sync = 1;
+       }
 }
 
 /*-----------------------------------------------------------------
  * Reads
  *---------------------------------------------------------------*/
-/* Switch to next dev, via round-robin, after MIN_READS reads */
-#define MIN_READS 128
-
-/* choose_mirror
- * @ms: the mirror set
- *
- * This function is used for read balancing.
- *
- * Returns: chosen mirror, or NULL on failure
- */
-static struct mirror *choose_mirror(struct mirror_set *ms)
-{
-       struct mirror *start_mirror = ms->read_mirror;
-
-       /*
-        * Perform MIN_READS on each working mirror then
-        * advance to the next one.  start_mirror stores
-        * the first we tried, so we know when we're done.
-        */
-       do {
-               if (likely(!atomic_read(&ms->read_mirror->error_count)) &&
-                   !atomic_dec_and_test(&ms->read_count))
-                       goto use_mirror;
-
-               atomic_set(&ms->read_count, MIN_READS);
-
-               if (ms->read_mirror-- == ms->mirror)
-                       ms->read_mirror += ms->nr_mirrors;
-       } while (ms->read_mirror != start_mirror);
-
-       /*
-        * We've rejected every mirror.
-        * Confirm the start_mirror can be used.
-        */
-       if (unlikely(atomic_read(&ms->read_mirror->error_count)))
-               return NULL;
-
-use_mirror:
-       return ms->read_mirror;
-}
-
-/* fail_mirror
- * @m: mirror device to fail
- *
- * If the device is valid, mark it invalid.  Also,
- * if this is the default mirror device (i.e. the primary
- * device) and the mirror set is in-sync, choose an
- * alternate primary device.
- *
- * This function cannot block.
- */
-static void fail_mirror(struct mirror *m)
-{
-       struct mirror_set *ms = m->ms;
-       struct mirror *new;
-
-       atomic_inc(&m->error_count);
-
-       if (atomic_read(&m->error_count) > 1)
-               return;
-
-       if (m != ms->default_mirror)
-               return;
-
-       /*
-        * If the default mirror fails, change it.
-        * In the case of cluster mirroring, the default
-        * is changed in rh_update_states.
-        */
-       if (!ms->in_sync) {
-               /*
-                * Can not switch primary.  Better to issue requests
-                * to same failing device than to risk returning
-                * corrupt data.
-                */
-               DMERR("Primary mirror device has failed while mirror is not in-sync");
-               DMERR("Unable to choose alternative primary device");
-               return;
-       }
-
-       for (new = ms->mirror; new < ms->mirror + ms->nr_mirrors; new++)
-               if (!atomic_read(&new->error_count)) {
-                       ms->default_mirror = new;
-                       break;
-               }
-
-       if (unlikely(new == ms->mirror + ms->nr_mirrors))
-               DMWARN("All sides of mirror have failed.");
-}
-
-static int default_ok(struct mirror *m)
+static struct mirror *choose_mirror(struct mirror_set *ms, sector_t sector)
 {
-       return !atomic_read(&m->ms->default_mirror->error_count);
-}
-
-static int mirror_available(struct mirror_set *ms, struct bio *bio)
-{
-       region_t region = bio_to_region(&ms->rh, bio);
-
-       if (ms->rh.log->type->in_sync(ms->rh.log, region, 0) > 0)
-               return choose_mirror(ms) ? 1 : 0;
-
-       return 0;
+       /* FIXME: add read balancing */
+       return ms->default_mirror;
 }
 
 /*
  * remap a buffer to a particular mirror.
  */
-static sector_t map_sector(struct mirror *m, struct bio *bio)
-{
-       return m->offset + (bio->bi_sector - m->ms->ti->begin);
-}
-
-static void map_bio(struct mirror *m, struct bio *bio)
+static void map_bio(struct mirror_set *ms, struct mirror *m, struct bio *bio)
 {
        bio->bi_bdev = m->dev->bdev;
-       bio->bi_sector = map_sector(m, bio);
-}
-
-static void map_region(struct io_region *io, struct mirror *m,
-                      struct bio *bio)
-{
-       io->bdev = m->dev->bdev;
-       io->sector = map_sector(m, bio);
-       io->count = bio->bi_size >> 9;
-}
-
-/*-----------------------------------------------------------------
- * Reads
- *---------------------------------------------------------------*/
-static void read_callback(unsigned long error, void *context)
-{
-       struct bio *bio = (struct bio *)context;
-       struct mirror *m;
-
-       m = bio_get_m(bio);
-       bio_set_m(bio, NULL);
-
-       if (unlikely(error)) {
-               DMWARN("A read failure occurred on a mirror device.");
-               fail_mirror(m);
-               if (likely(default_ok(m)) || mirror_available(m->ms, bio)) {
-                       DMWARN("Trying different device.");
-                       queue_bio(m->ms, bio, bio_rw(bio));
-               } else {
-                       DMERR("No other device available, failing I/O.");
-                       bio_endio(bio, bio->bi_size, -EIO);
-               }
-       } else
-               bio_endio(bio, bio->bi_size, 0);
-}
-
-/* Asynchronous read. */
-static void read_async_bio(struct mirror *m, struct bio *bio)
-{
-       struct io_region io;
-
-       map_region(&io, m, bio);
-       bio_set_m(bio, m);
-       dm_io_async_bvec(1, &io, READ,
-                        bio->bi_io_vec + bio->bi_idx,
-                        read_callback, bio);
+       bio->bi_sector = m->offset + (bio->bi_sector - ms->ti->begin);
 }
 
 static void do_reads(struct mirror_set *ms, struct bio_list *reads)
 {
+       region_t region;
        struct bio *bio;
        struct mirror *m;
 
        while ((bio = bio_list_pop(reads))) {
+               region = bio_to_region(&ms->rh, bio);
+
                /*
                 * We can only read balance if the region is in sync.
                 */
-               if (likely(rh_in_sync(&ms->rh,
-                                     bio_to_region(&ms->rh, bio), 0)))
-                       m = choose_mirror(ms);
-               else {
+               if (rh_in_sync(&ms->rh, region, 0))
+                       m = choose_mirror(ms, bio->bi_sector);
+               else
                        m = ms->default_mirror;
 
-                       /* If default has failed, we give up. */
-                       if (unlikely(m && atomic_read(&m->error_count)))
-                               m = NULL;
-               }
-
-               if (likely(m))
-                       read_async_bio(m, bio);
-               else
-                       bio_endio(bio, bio->bi_size, -EIO);
+               map_bio(ms, m, bio);
+               generic_make_request(bio);
        }
 }
 
@@ -985,70 +731,15 @@ static void do_reads(struct mirror_set *ms, struct bio_list *reads)
  * RECOVERING: delay the io until recovery completes
  * NOSYNC:     increment pending, just write to the default mirror
  *---------------------------------------------------------------*/
-
-/* __bio_mark_nosync
- * @ms
- * @bio
- * @done
- * @error
- *
- * The bio was written on some mirror(s) but failed on other mirror(s).
- * We can successfully endio the bio but should avoid the region being
- * marked clean by setting the state RH_NOSYNC.
- *
- * This function is _not_ interrupt safe!
- */
-static void __bio_mark_nosync(struct mirror_set *ms,
-                             struct bio *bio, unsigned int done, int error)
+static void write_callback(unsigned long error, void *context)
 {
-       unsigned long flags;
-       struct region_hash *rh = &ms->rh;
-       struct dirty_log *log = ms->rh.log;
-       struct region *reg;
-       region_t region = bio_to_region(rh, bio);
-       int recovering = 0;
-
-       ms->in_sync = 0;
-
-       /* We must inform the log that the sync count has changed. */
-       log->type->set_region_sync(log, region, 0);
-
-       read_lock(&rh->hash_lock);
-       reg = __rh_find(rh, region);
-       read_unlock(&rh->hash_lock);
-
-       /* region hash entry should exist because write was in-flight */
-       BUG_ON(!reg);
-       BUG_ON(!list_empty(&reg->list));
-
-       spin_lock_irqsave(&rh->region_lock, flags);
-       /*
-        * Possible cases:
-        *   1) RH_DIRTY
-        *   2) RH_NOSYNC: was dirty, other preceeding writes failed
-        *   3) RH_RECOVERING: flushing pending writes
-        * Either case, the region should have not been connected to list.
-        */
-       recovering = (reg->state == RH_RECOVERING);
-       reg->state = RH_NOSYNC;
-       BUG_ON(!list_empty(&reg->list));
-       spin_unlock_irqrestore(&rh->region_lock, flags);
-
-       bio_endio(bio, done, error);
-       if (recovering)
-               complete_resync_work(reg, 0);
-}
-
-static void write_callback(unsigned long error, void *context, int log_failure)
-{
-       unsigned int i, ret = 0;
+       unsigned int i;
+       int uptodate = 1;
        struct bio *bio = (struct bio *) context;
        struct mirror_set *ms;
-       int uptodate = 0;
-       int should_wake = 0;
 
-       ms = (bio_get_m(bio))->ms;
-       bio_set_m(bio, NULL);
+       ms = bio_get_ms(bio);
+       bio_set_ms(bio, NULL);
 
        /*
         * NOTE: We don't decrement the pending count here,
@@ -1056,95 +747,47 @@ static void write_callback(unsigned long error, void *context, int log_failure)
         * This way we handle both writes to SYNC and NOSYNC
         * regions with the same code.
         */
-       if (unlikely(error)) {
-               DMERR("Error during write occurred.");
 
+       if (error) {
                /*
-                * If the log is intact, we can play around with trying
-                * to handle the failure.  Otherwise, we have to report
-                * the I/O as failed.
+                * only error the io if all mirrors failed.
+                * FIXME: bogus
                 */
-               if (!log_failure) {
-                       for (i = 0; i < ms->nr_mirrors; i++) {
-                               if (test_bit(i, &error))
-                                       fail_mirror(ms->mirror + i);
-                               else
-                                       uptodate = 1;
+               uptodate = 0;
+               for (i = 0; i < ms->nr_mirrors; i++)
+                       if (!test_bit(i, &error)) {
+                               uptodate = 1;
+                               break;
                        }
-               }
-
-               if (likely(uptodate)) {
-                       /*
-                        * Need to raise event.  Since raising
-                        * events can block, we need to do it in
-                        * the main thread.
-                        */
-                       spin_lock(&ms->lock);
-                       if (!ms->failures.head)
-                               should_wake = 1;
-                       bio_list_add(&ms->failures, bio);
-                       spin_unlock(&ms->lock);
-                       if (should_wake)
-                               wake();
-                       return;
-               } else {
-                       DMERR("All replicated volumes dead, failing I/O");
-                       /* None of the writes succeeded, fail the I/O. */
-                       ret = -EIO;
-               }
        }
-
-       bio_endio(bio, bio->bi_size, ret);
-}
-
-static void write_callback_good_log(unsigned long error, void *context)
-{
-       write_callback(error, context, 0);
-}
-
-static void write_callback_bad_log(unsigned long error, void *context)
-{
-       write_callback(error, context, 1);
+       bio_endio(bio, bio->bi_size, 0);
 }
 
-static void do_write(struct mirror_set *ms, struct bio *bio, int log_failure)
+static void do_write(struct mirror_set *ms, struct bio *bio)
 {
        unsigned int i;
-       struct io_region io[ms->nr_mirrors], *dest = io;
+       struct io_region io[KCOPYD_MAX_REGIONS+1];
        struct mirror *m;
 
-       if (log_failure && dm_mirror_error_on_log_failure) {
-               bio_endio(bio, bio->bi_size, -EIO);
-               return;
-       }
+       for (i = 0; i < ms->nr_mirrors; i++) {
+               m = ms->mirror + i;
 
-       for (i = 0, m = ms->mirror; i < ms->nr_mirrors; i++, m++)
-               map_region(dest++, m, bio);
+               io[i].bdev = m->dev->bdev;
+               io[i].sector = m->offset + (bio->bi_sector - ms->ti->begin);
+               io[i].count = bio->bi_size >> 9;
+       }
 
-       /*
-        * We can use the default mirror here, because we
-        * only need it in order to retrieve the reference
-        * to the mirror set in write_callback().
-        */
-       bio_set_m(bio, ms->default_mirror);
-       if (log_failure)
-               dm_io_async_bvec(ms->nr_mirrors, io, WRITE,
-                                bio->bi_io_vec + bio->bi_idx,
-                                write_callback_bad_log, bio);
-       else
-               dm_io_async_bvec(ms->nr_mirrors, io, WRITE,
-                                bio->bi_io_vec + bio->bi_idx,
-                                write_callback_good_log, bio);
+       bio_set_ms(bio, ms);
+       dm_io_async_bvec(ms->nr_mirrors, io, WRITE,
+                        bio->bi_io_vec + bio->bi_idx,
+                        write_callback, bio);
 }
 
 static void do_writes(struct mirror_set *ms, struct bio_list *writes)
 {
-       int state, r;
+       int state;
        struct bio *bio;
        struct bio_list sync, nosync, recover, *this_list = NULL;
-       struct bio_list requeue;
-       struct dirty_log *log = ms->rh.log;
-       region_t region;
 
        if (!writes->head)
                return;
@@ -1155,18 +798,9 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes)
        bio_list_init(&sync);
        bio_list_init(&nosync);
        bio_list_init(&recover);
-       bio_list_init(&requeue);
 
        while ((bio = bio_list_pop(writes))) {
-               region = bio_to_region(&ms->rh, bio);
-
-               if (log->type->is_remote_recovering &&
-                   log->type->is_remote_recovering(log, region)) {
-                       bio_list_add(&requeue, bio);
-                       continue;
-               }
-
-               state = rh_state(&ms->rh, region, 1);
+               state = rh_state(&ms->rh, bio_to_region(&ms->rh, bio), 1);
                switch (state) {
                case RH_CLEAN:
                case RH_DIRTY:
@@ -1185,14 +819,6 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes)
                bio_list_add(this_list, bio);
        }
 
-       /*
-        * Add bios that are delayed due to remote recovery
-        * back on to the write queue
-        */
-       spin_lock_irq(&ms->lock);
-       bio_list_merge(&ms->writes, &requeue);
-       spin_unlock_irq(&ms->lock);
-
        /*
         * Increment the pending counts for any regions that will
         * be written to (writes to recover regions are going to
@@ -1200,86 +826,54 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes)
         */
        rh_inc_pending(&ms->rh, &sync);
        rh_inc_pending(&ms->rh, &nosync);
-
-       r = rh_flush(&ms->rh);
+       rh_flush(&ms->rh);
 
        /*
         * Dispatch io.
         */
        while ((bio = bio_list_pop(&sync)))
-               do_write(ms, bio, r ? 1 : 0);
+               do_write(ms, bio);
 
        while ((bio = bio_list_pop(&recover)))
                rh_delay(&ms->rh, bio);
 
        while ((bio = bio_list_pop(&nosync))) {
-               map_bio(ms->default_mirror, bio);
+               map_bio(ms, ms->default_mirror, bio);
                generic_make_request(bio);
        }
 }
 
-static void do_failures(struct mirror_set *ms, struct bio_list *failures)
-{
-       struct bio *bio;
-       struct dirty_log *log = ms->rh.log;
-
-       if (!failures->head)
-               return;
-
-       if (log->type->get_failure_response(log) == DMLOG_IOERR_BLOCK)
-               dm_table_event(ms->ti->table);
-
-       while ((bio = bio_list_pop(failures)))
-               __bio_mark_nosync(ms, bio, bio->bi_size, 0);
-}
-
 /*-----------------------------------------------------------------
  * kmirrord
  *---------------------------------------------------------------*/
 static LIST_HEAD(_mirror_sets);
 static DECLARE_RWSEM(_mirror_sets_lock);
 
-static int do_mirror(struct mirror_set *ms)
+static void do_mirror(struct mirror_set *ms)
 {
-       struct bio_list reads, writes, failures;
+       struct bio_list reads, writes;
 
-       spin_lock_irq(&ms->lock);
+       spin_lock(&ms->lock);
        reads = ms->reads;
        writes = ms->writes;
-       failures = ms->failures;
        bio_list_init(&ms->reads);
        bio_list_init(&ms->writes);
-       bio_list_init(&ms->failures);
-       spin_unlock_irq(&ms->lock);
+       spin_unlock(&ms->lock);
 
        rh_update_states(&ms->rh);
        do_recovery(ms);
        do_reads(ms, &reads);
        do_writes(ms, &writes);
-       do_failures(ms, &failures);
-
-       return (ms->writes.head) ? 1 : 0;
 }
 
-static int _do_work(void)
+static void do_work(void *ignored)
 {
-       int more_work = 0;
        struct mirror_set *ms;
 
        down_read(&_mirror_sets_lock);
        list_for_each_entry (ms, &_mirror_sets, list)
-               more_work += do_mirror(ms);
+               do_mirror(ms);
        up_read(&_mirror_sets_lock);
-
-       return more_work;
-}
-
-static void do_work(void *ignored)
-{
-       while (_do_work()) {
-               set_current_state(TASK_INTERRUPTIBLE);
-               schedule_timeout(HZ/5);
-       }
 }
 
 /*-----------------------------------------------------------------
@@ -1300,7 +894,7 @@ static struct mirror_set *alloc_context(unsigned int nr_mirrors,
 
        ms = kmalloc(len, GFP_KERNEL);
        if (!ms) {
-               ti->error = "Cannot allocate mirror context";
+               ti->error = "dm-mirror: Cannot allocate mirror context";
                return NULL;
        }
 
@@ -1311,19 +905,14 @@ static struct mirror_set *alloc_context(unsigned int nr_mirrors,
        ms->nr_mirrors = nr_mirrors;
        ms->nr_regions = dm_sector_div_up(ti->len, region_size);
        ms->in_sync = 0;
-       ms->read_mirror = &ms->mirror[DEFAULT_MIRROR];
        ms->default_mirror = &ms->mirror[DEFAULT_MIRROR];
 
        if (rh_init(&ms->rh, ms, dl, region_size, ms->nr_regions)) {
-               ti->error = "Error creating dirty region hash";
+               ti->error = "dm-mirror: Error creating dirty region hash";
                kfree(ms);
                return NULL;
        }
 
-       atomic_set(&ms->read_count, MIN_READS);
-
-       bio_list_init(&ms->failures);
-
        return ms;
 }
 
@@ -1346,23 +935,21 @@ static inline int _check_region_size(struct dm_target *ti, uint32_t size)
 static int get_mirror(struct mirror_set *ms, struct dm_target *ti,
                      unsigned int mirror, char **argv)
 {
-       unsigned long long offset;
+       sector_t offset;
 
-       if (sscanf(argv[1], "%llu", &offset) != 1) {
-               ti->error = "Invalid offset";
+       if (sscanf(argv[1], SECTOR_FORMAT, &offset) != 1) {
+               ti->error = "dm-mirror: Invalid offset";
                return -EINVAL;
        }
 
        if (dm_get_device(ti, argv[0], offset, ti->len,
                          dm_table_get_mode(ti->table),
                          &ms->mirror[mirror].dev)) {
-               ti->error = "Device lookup failure";
+               ti->error = "dm-mirror: Device lookup failure";
                return -ENXIO;
        }
 
        ms->mirror[mirror].offset = offset;
-       atomic_set(&(ms->mirror[mirror].error_count), 0);
-       ms->mirror[mirror].ms = ms;
 
        return 0;
 }
@@ -1395,30 +982,30 @@ static struct dirty_log *create_dirty_log(struct dm_target *ti,
        struct dirty_log *dl;
 
        if (argc < 2) {
-               ti->error = "Insufficient mirror log arguments";
+               ti->error = "dm-mirror: Insufficient mirror log arguments";
                return NULL;
        }
 
        if (sscanf(argv[1], "%u", &param_count) != 1) {
-               ti->error = "Invalid mirror log argument count";
+               ti->error = "dm-mirror: Invalid mirror log argument count";
                return NULL;
        }
 
        *args_used = 2 + param_count;
 
        if (argc < *args_used) {
-               ti->error = "Insufficient mirror log arguments";
+               ti->error = "dm-mirror: Insufficient mirror log arguments";
                return NULL;
        }
 
        dl = dm_create_dirty_log(argv[0], ti, param_count, argv + 2);
        if (!dl) {
-               ti->error = "Error creating mirror dirty log";
+               ti->error = "dm-mirror: Error creating mirror dirty log";
                return NULL;
        }
 
        if (!_check_region_size(ti, dl->type->get_region_size(dl))) {
-               ti->error = "Invalid region size";
+               ti->error = "dm-mirror: Invalid region size";
                dm_destroy_dirty_log(dl);
                return NULL;
        }
@@ -1452,7 +1039,7 @@ static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
 
        if (!argc || sscanf(argv[0], "%u", &nr_mirrors) != 1 ||
            nr_mirrors < 2 || nr_mirrors > KCOPYD_MAX_REGIONS + 1) {
-               ti->error = "Invalid number of mirrors";
+               ti->error = "dm-mirror: Invalid number of mirrors";
                dm_destroy_dirty_log(dl);
                return -EINVAL;
        }
@@ -1460,7 +1047,7 @@ static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
        argv++, argc--;
 
        if (argc != nr_mirrors * 2) {
-               ti->error = "Wrong number of mirror arguments";
+               ti->error = "dm-mirror: Wrong number of mirror arguments";
                dm_destroy_dirty_log(dl);
                return -EINVAL;
        }
@@ -1506,15 +1093,14 @@ static void mirror_dtr(struct dm_target *ti)
 
 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw)
 {
-       unsigned long flags;
        int should_wake = 0;
        struct bio_list *bl;
 
        bl = (rw == WRITE) ? &ms->writes : &ms->reads;
-       spin_lock_irqsave(&ms->lock, flags);
+       spin_lock(&ms->lock);
        should_wake = !(bl->head);
        bio_list_add(bl, bio);
-       spin_unlock_irqrestore(&ms->lock, flags);
+       spin_unlock(&ms->lock);
 
        if (should_wake)
                wake();
@@ -1529,64 +1115,42 @@ static int mirror_map(struct dm_target *ti, struct bio *bio,
        int r, rw = bio_rw(bio);
        struct mirror *m;
        struct mirror_set *ms = ti->private;
-       struct bio_map_info *bmi = NULL;
-       struct dm_bio_details *bd = NULL;
+
+       map_context->ll = bio_to_region(&ms->rh, bio);
 
        if (rw == WRITE) {
-               /* Save region for mirror_end_io() handler */
-               map_context->ll = bio_to_region(&ms->rh, bio);
                queue_bio(ms, bio, rw);
                return 0;
        }
 
-       /* All about the reads now */
-
        r = ms->rh.log->type->in_sync(ms->rh.log,
                                      bio_to_region(&ms->rh, bio), 0);
        if (r < 0 && r != -EWOULDBLOCK)
                return r;
 
-       if (r == -EWOULDBLOCK)
+       if (r == -EWOULDBLOCK)  /* FIXME: ugly */
                r = 0;
 
-       if (likely(r)) {
-               /*
-                * Optimize reads by avoiding to hand them to daemon.
-                *
-                * In case they fail, queue them for another shot
-                * in the mirror_end_io() function.
-                */
-               m = choose_mirror(ms);
-               if (likely(m)) {
-                       bmi = mempool_alloc(bio_map_info_pool, GFP_NOIO);
-
-                       if (likely(bmi)) {
-                               /* without this, a read is not retryable */
-                               bd = &bmi->bmi_bd;
-                               dm_bio_record(bd, bio);
-                               map_context->ptr = bmi;
-                               bmi->bmi_m = m;
-                       } else {
-                               /* we could fail now, but we can at least  **
-                               ** give it a shot.  The bd is only used to **
-                               ** retry in the event of a failure anyway. **
-                               ** If we fail, we can fail the I/O then.   */
-                               map_context->ptr = NULL;
-                       }
-
-                       map_bio(m, bio);
-                       return 1; /* Mapped -> queue request. */
-               } else
-                       return -EIO;
-       } else {
-               /* Either not clean, or -EWOULDBLOCK */
-               if (rw == READA)
-                       return -EWOULDBLOCK;
+       /*
+        * We don't want to fast track a recovery just for a read
+        * ahead.  So we just let it silently fail.
+        * FIXME: get rid of this.
+        */
+       if (!r && rw == READA)
+               return -EIO;
 
+       if (!r) {
+               /* Pass this io over to the daemon */
                queue_bio(ms, bio, rw);
+               return 0;
        }
 
-       return 0;
+       m = choose_mirror(ms, bio->bi_sector);
+       if (!m)
+               return -EIO;
+
+       map_bio(ms, m, bio);
+       return 1;
 }
 
 static int mirror_end_io(struct dm_target *ti, struct bio *bio,
@@ -1594,71 +1158,15 @@ static int mirror_end_io(struct dm_target *ti, struct bio *bio,
 {
        int rw = bio_rw(bio);
        struct mirror_set *ms = (struct mirror_set *) ti->private;
-       struct mirror *m = NULL;
-       struct dm_bio_details *bd = NULL;
+       region_t region = map_context->ll;
 
        /*
         * We need to dec pending if this was a write.
         */
-       if (rw == WRITE) {
-               rh_dec(&ms->rh, map_context->ll);
-               return error;
-       }
-
-       if (error == -EOPNOTSUPP)
-               goto out;
-
-       if ((error == -EWOULDBLOCK) && bio_rw_ahead(bio))
-               goto out;
-
-       if (unlikely(error)) {
-               DMERR("A read failure occurred on a mirror device.");
-               if (!map_context->ptr) {
-                       /*
-                        * There wasn't enough memory to record necessary
-                        * information for a retry or there was no other
-                        * mirror in-sync.
-                        */
-                       DMERR("Unable to retry read.");
-                       return -EIO;
-               }
-               m = ((struct bio_map_info *)map_context->ptr)->bmi_m;
-               fail_mirror(m); /* Flag error on mirror. */
-
-               /*
-                * A failed read needs to get queued
-                * to the daemon for another shot to
-                * one (if any) intact mirrors.
-                */
-               if (default_ok(m) || mirror_available(ms, bio)) {
-                       bd = &(((struct bio_map_info *)map_context->ptr)->bmi_bd
-                               );
-
-                       DMWARN("Trying different device.");
-                       dm_bio_restore(bd, bio);
-                       mempool_free(map_context->ptr, bio_map_info_pool);
-                       map_context->ptr = NULL;
-                       queue_bio(ms, bio, rw);
-                       return 1; /* We want another shot on the bio. */
-               }
-               DMERR("All replicated volumes dead, failing I/O");
-       }
-
-out:
-       if (map_context->ptr)
-               mempool_free(map_context->ptr, bio_map_info_pool);
-
-       return error;
-}
+       if (rw == WRITE)
+               rh_dec(&ms->rh, region);
 
-static void mirror_presuspend(struct dm_target *ti)
-{
-       struct mirror_set *ms = (struct mirror_set *) ti->private;
-       struct dirty_log *log = ms->rh.log;
-
-       if (log->type->presuspend && log->type->presuspend(log))
-               /* FIXME: need better error handling */
-               DMWARN("log presuspend failed");
+       return 0;
 }
 
 static void mirror_postsuspend(struct dm_target *ti)
@@ -1667,14 +1175,9 @@ static void mirror_postsuspend(struct dm_target *ti)
        struct dirty_log *log = ms->rh.log;
 
        rh_stop_recovery(&ms->rh);
-
-       /* Wait for all I/O we generated to complete */
-       wait_event(recovery_stopped_event,
-                  !atomic_read(&ms->rh.recovery_in_flight));
-
-       if (log->type->postsuspend && log->type->postsuspend(log))
+       if (log->type->suspend && log->type->suspend(log))
                /* FIXME: need better error handling */
-               DMWARN("log postsuspend failed");
+               DMWARN("log suspend failed");
 }
 
 static void mirror_resume(struct dm_target *ti)
@@ -1690,32 +1193,27 @@ static void mirror_resume(struct dm_target *ti)
 static int mirror_status(struct dm_target *ti, status_type_t type,
                         char *result, unsigned int maxlen)
 {
-       unsigned int m, sz = 0;
+       unsigned int m, sz;
        struct mirror_set *ms = (struct mirror_set *) ti->private;
-       char buffer[ms->nr_mirrors + 1];
+
+       sz = ms->rh.log->type->status(ms->rh.log, type, result, maxlen);
 
        switch (type) {
        case STATUSTYPE_INFO:
                DMEMIT("%d ", ms->nr_mirrors);
-               for (m = 0; m < ms->nr_mirrors; m++) {
+               for (m = 0; m < ms->nr_mirrors; m++)
                        DMEMIT("%s ", ms->mirror[m].dev->name);
-                       buffer[m] = atomic_read(&(ms->mirror[m].error_count)) ?
-                               'D' : 'A';
-               }
-               buffer[m] = '\0';
 
-               DMEMIT("%llu/%llu 1 %s ",
+               DMEMIT(SECTOR_FORMAT "/" SECTOR_FORMAT,
                       ms->rh.log->type->get_sync_count(ms->rh.log),
-                      ms->nr_regions, buffer);
-               ms->rh.log->type->status(ms->rh.log, type, result+sz, maxlen-sz);
+                      ms->nr_regions);
                break;
 
        case STATUSTYPE_TABLE:
-               sz = ms->rh.log->type->status(ms->rh.log, type, result, maxlen);
                DMEMIT("%d ", ms->nr_mirrors);
                for (m = 0; m < ms->nr_mirrors; m++)
-                       DMEMIT("%s %llu ", ms->mirror[m].dev->name,
-                               (unsigned long long)ms->mirror[m].offset);
+                       DMEMIT("%s " SECTOR_FORMAT " ",
+                              ms->mirror[m].dev->name, ms->mirror[m].offset);
        }
 
        return 0;
@@ -1723,13 +1221,12 @@ static int mirror_status(struct dm_target *ti, status_type_t type,
 
 static struct target_type mirror_target = {
        .name    = "mirror",
-       .version = {1, 2, 0},
+       .version = {1, 0, 1},
        .module  = THIS_MODULE,
        .ctr     = mirror_ctr,
        .dtr     = mirror_dtr,
        .map     = mirror_map,
        .end_io  = mirror_end_io,
-       .presuspend = mirror_presuspend,
        .postsuspend = mirror_postsuspend,
        .resume  = mirror_resume,
        .status  = mirror_status,
@@ -1739,11 +1236,6 @@ static int __init dm_mirror_init(void)
 {
        int r;
 
-       bio_map_info_pool = mempool_create(100, bio_map_info_alloc,
-                                          bio_map_info_free, NULL);
-       if (!bio_map_info_pool)
-               return -ENOMEM;
-
        r = dm_dirty_log_init();
        if (r)
                return r;
@@ -1752,7 +1244,7 @@ static int __init dm_mirror_init(void)
        if (!_kmirrord_wq) {
                DMERR("couldn't start kmirrord");
                dm_dirty_log_exit();
-               return -ENOMEM;
+               return r;
        }
        INIT_WORK(&_kmirrord_work, do_work, NULL);
 
@@ -1762,15 +1254,6 @@ static int __init dm_mirror_init(void)
                      mirror_target.name);
                dm_dirty_log_exit();
                destroy_workqueue(_kmirrord_wq);
-       } else if (!dm_mirror_error_on_log_failure) {
-               DMWARN("Warning: dm_mirror_error_on_log_failure = 0");
-               DMWARN("In this mode, the following fault sequence could cause corruption:");
-               DMWARN("  1) Log device failure");
-               DMWARN("  2) Write I/O issued");
-               DMWARN("  3) Machine failure");
-               DMWARN("  4) Log device restored");
-               DMWARN("  5) Machine reboots");
-               DMWARN("If this happens, you must resync your mirror.");
        }
 
        return r;
@@ -1792,8 +1275,6 @@ static void __exit dm_mirror_exit(void)
 module_init(dm_mirror_init);
 module_exit(dm_mirror_exit);
 
-module_param(dm_mirror_error_on_log_failure, int, 1);
-MODULE_PARM_DESC(dm_mirror_error_on_log_failure, "Set to '0' if you want writes to succeed on log device failure");
 MODULE_DESCRIPTION(DM_NAME " mirror target");
 MODULE_AUTHOR("Joe Thornber");
 MODULE_LICENSE("GPL");