X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=drivers%2Fmd%2Fdm-raid1.c;h=8cfc33b1e68f0fc043717e1419b4295c9e7c8619;hb=16c70f8c1b54b61c3b951b6fb220df250fe09b32;hp=6e3cf7e134515c1540ba4409ee2eedf5cf4ba094;hpb=f7f1b0f1e2fbadeab12d24236000e778aa9b1ead;p=linux-2.6.git diff --git a/drivers/md/dm-raid1.c b/drivers/md/dm-raid1.c index 6e3cf7e13..8cfc33b1e 100644 --- a/drivers/md/dm-raid1.c +++ b/drivers/md/dm-raid1.c @@ -6,6 +6,7 @@ #include "dm.h" #include "dm-bio-list.h" +#include "dm-bio-record.h" #include "dm-io.h" #include "dm-log.h" #include "kcopyd.h" @@ -20,8 +21,13 @@ #include #include +#define DM_MSG_PREFIX "raid1" + static struct workqueue_struct *_kmirrord_wq; static struct work_struct _kmirrord_work; +DECLARE_WAIT_QUEUE_HEAD(recovery_stopped_event); + +static int dm_mirror_error_on_log_failure = 1; static inline void wake(void) { @@ -81,10 +87,12 @@ struct region_hash { struct list_head *buckets; spinlock_t region_lock; + atomic_t recovery_in_flight; struct semaphore recovery_count; struct list_head clean_regions; struct list_head quiesced_regions; struct list_head recovered_regions; + struct list_head failed_recovered_regions; }; enum { @@ -106,12 +114,46 @@ struct region { struct bio_list delayed_bios; }; + +/*----------------------------------------------------------------- + * Mirror set structures. + *---------------------------------------------------------------*/ +struct mirror { + atomic_t error_count; /* Error counter to flag mirror failure */ + struct mirror_set *ms; + struct dm_dev *dev; + sector_t offset; +}; + +struct mirror_set { + struct dm_target *ti; + struct list_head list; + struct region_hash rh; + struct kcopyd_client *kcopyd_client; + + spinlock_t lock; /* protects the lists */ + struct bio_list reads; + struct bio_list writes; + struct bio_list failures; + + /* recovery */ + region_t nr_regions; + int in_sync; + + struct mirror *default_mirror; /* Default mirror */ + + unsigned int nr_mirrors; + atomic_t read_count; /* Read counter for read balancing */ + struct mirror *read_mirror; /* Last mirror read. */ + struct mirror mirror[0]; +}; + /* * Conversion fns */ static inline region_t bio_to_region(struct region_hash *rh, struct bio *bio) { - return bio->bi_sector >> rh->region_shift; + return (bio->bi_sector - rh->ms->ti->begin) >> rh->region_shift; } static inline sector_t region_to_sector(struct region_hash *rh, region_t region) @@ -122,16 +164,6 @@ static inline sector_t region_to_sector(struct region_hash *rh, region_t region) /* FIXME move this */ static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw); -static void *region_alloc(unsigned int __nocast gfp_mask, void *pool_data) -{ - return kmalloc(sizeof(struct region), gfp_mask); -} - -static void region_free(void *element, void *pool_data) -{ - kfree(element); -} - #define MIN_REGIONS 64 #define MAX_RECOVERY 1 static int rh_init(struct region_hash *rh, struct mirror_set *ms, @@ -169,12 +201,14 @@ static int rh_init(struct region_hash *rh, struct mirror_set *ms, spin_lock_init(&rh->region_lock); sema_init(&rh->recovery_count, 0); + atomic_set(&rh->recovery_in_flight, 0); INIT_LIST_HEAD(&rh->clean_regions); INIT_LIST_HEAD(&rh->quiesced_regions); INIT_LIST_HEAD(&rh->recovered_regions); + INIT_LIST_HEAD(&rh->failed_recovered_regions); - rh->region_pool = mempool_create(MIN_REGIONS, region_alloc, - region_free, NULL); + rh->region_pool = mempool_create_kmalloc_pool(MIN_REGIONS, + sizeof(struct region)); if (!rh->region_pool) { vfree(rh->buckets); rh->buckets = NULL; @@ -233,7 +267,9 @@ static struct region *__rh_alloc(struct region_hash *rh, region_t region) struct region *reg, *nreg; read_unlock(&rh->hash_lock); - nreg = mempool_alloc(rh->region_pool, GFP_NOIO); + nreg = mempool_alloc(rh->region_pool, GFP_ATOMIC); + if (unlikely(!nreg)) + nreg = kmalloc(sizeof(struct region), GFP_NOIO); nreg->state = rh->log->type->in_sync(rh->log, region, 1) ? RH_CLEAN : RH_NOSYNC; nreg->rh = rh; @@ -317,12 +353,24 @@ static void dispatch_bios(struct mirror_set *ms, struct bio_list *bio_list) } } +static void complete_resync_work(struct region *reg, int success) +{ + struct region_hash *rh = reg->rh; + + rh->log->type->set_region_sync(rh->log, reg->key, success); + if (atomic_dec_and_test(&rh->recovery_in_flight)) + wake_up_all(&recovery_stopped_event); + dispatch_bios(rh->ms, ®->delayed_bios); + up(&rh->recovery_count); +} + static void rh_update_states(struct region_hash *rh) { struct region *reg, *next; LIST_HEAD(clean); LIST_HEAD(recovered); + LIST_HEAD(failed_recovered); /* * Quickly grab the lists. @@ -346,6 +394,15 @@ static void rh_update_states(struct region_hash *rh) list_for_each_entry (reg, &recovered, list) list_del(®->hash_list); } + + if (!list_empty(&rh->failed_recovered_regions)) { + list_splice(&rh->failed_recovered_regions, &failed_recovered); + INIT_LIST_HEAD(&rh->failed_recovered_regions); + + list_for_each_entry (reg, &failed_recovered, list) + list_del(®->hash_list); + } + spin_unlock(&rh->region_lock); write_unlock_irq(&rh->hash_lock); @@ -356,9 +413,12 @@ static void rh_update_states(struct region_hash *rh) */ list_for_each_entry_safe (reg, next, &recovered, list) { rh->log->type->clear_region(rh->log, reg->key); - rh->log->type->complete_resync_work(rh->log, reg->key, 1); - dispatch_bios(rh->ms, ®->delayed_bios); - up(&rh->recovery_count); + complete_resync_work(reg, 1); + mempool_free(reg, rh->region_pool); + } + + list_for_each_entry_safe (reg, next, &failed_recovered, list) { + complete_resync_work(reg, 0); mempool_free(reg, rh->region_pool); } @@ -375,16 +435,20 @@ static void rh_inc(struct region_hash *rh, region_t region) read_lock(&rh->hash_lock); reg = __rh_find(rh, region); - if (reg->state == RH_CLEAN) { - rh->log->type->mark_region(rh->log, reg->key); - spin_lock_irq(&rh->region_lock); + spin_lock_irq(&rh->region_lock); + atomic_inc(®->pending); + + if (reg->state == RH_CLEAN) { reg->state = RH_DIRTY; list_del_init(®->list); /* take off the clean list */ spin_unlock_irq(&rh->region_lock); - } - atomic_inc(®->pending); + rh->log->type->mark_region(rh->log, reg->key); + } else + spin_unlock_irq(&rh->region_lock); + + read_unlock(&rh->hash_lock); } @@ -406,17 +470,29 @@ static void rh_dec(struct region_hash *rh, region_t region) reg = __rh_lookup(rh, region); read_unlock(&rh->hash_lock); + spin_lock_irqsave(&rh->region_lock, flags); if (atomic_dec_and_test(®->pending)) { - spin_lock_irqsave(&rh->region_lock, flags); + /* + * There is no pending I/O for this region. + * We can move the region to corresponding list for next action. + * At this point, the region is not yet connected to any list. + * + * If the state is RH_NOSYNC, the region should be kept off + * from clean list. + * The hash entry for RH_NOSYNC will remain in memory + * until the region is recovered or the map is reloaded. + */ + + /* do nothing for RH_NOSYNC */ if (reg->state == RH_RECOVERING) { list_add_tail(®->list, &rh->quiesced_regions); - } else { + } else if (reg->state == RH_DIRTY) { reg->state = RH_CLEAN; list_add(®->list, &rh->clean_regions); } - spin_unlock_irqrestore(&rh->region_lock, flags); should_wake = 1; } + spin_unlock_irqrestore(&rh->region_lock, flags); if (should_wake) wake(); @@ -452,11 +528,9 @@ static int __rh_recovery_prepare(struct region_hash *rh) /* Already quiesced ? */ if (atomic_read(®->pending)) list_del_init(®->list); + else + list_move(®->list, &rh->quiesced_regions); - else { - list_del_init(®->list); - list_add(®->list, &rh->quiesced_regions); - } spin_unlock_irq(&rh->region_lock); return 1; @@ -464,11 +538,21 @@ static int __rh_recovery_prepare(struct region_hash *rh) static void rh_recovery_prepare(struct region_hash *rh) { - while (!down_trylock(&rh->recovery_count)) + /* Extra reference to avoid race with rh_stop_recovery */ + atomic_inc(&rh->recovery_in_flight); + + while (!down_trylock(&rh->recovery_count)) { + atomic_inc(&rh->recovery_in_flight); if (__rh_recovery_prepare(rh) <= 0) { + atomic_dec(&rh->recovery_in_flight); up(&rh->recovery_count); break; } + } + + /* Drop the extra reference */ + if (atomic_dec_and_test(&rh->recovery_in_flight)) + wake_up_all(&recovery_stopped_event); } /* @@ -489,21 +573,26 @@ static struct region *rh_recovery_start(struct region_hash *rh) return reg; } -/* FIXME: success ignored for now */ static void rh_recovery_end(struct region *reg, int success) { struct region_hash *rh = reg->rh; spin_lock_irq(&rh->region_lock); - list_add(®->list, ®->rh->recovered_regions); + if (success || + (rh->log->type->get_failure_response(rh->log) == DMLOG_IOERR_IGNORE)) + list_add(®->list, ®->rh->recovered_regions); + else { + reg->state = RH_NOSYNC; + list_add(®->list, ®->rh->failed_recovered_regions); + } spin_unlock_irq(&rh->region_lock); wake(); } -static void rh_flush(struct region_hash *rh) +static int rh_flush(struct region_hash *rh) { - rh->log->type->flush(rh->log); + return rh->log->type->flush(rh->log); } static void rh_delay(struct region_hash *rh, struct bio *bio) @@ -535,32 +624,20 @@ static void rh_start_recovery(struct region_hash *rh) wake(); } -/*----------------------------------------------------------------- - * Mirror set structures. - *---------------------------------------------------------------*/ -struct mirror { - atomic_t error_count; - struct dm_dev *dev; - sector_t offset; +struct bio_map_info { + struct mirror *bmi_m; + struct dm_bio_details bmi_bd; }; -struct mirror_set { - struct dm_target *ti; - struct list_head list; - struct region_hash rh; - struct kcopyd_client *kcopyd_client; - - spinlock_t lock; /* protects the next two lists */ - struct bio_list reads; - struct bio_list writes; +static mempool_t *bio_map_info_pool = NULL; - /* recovery */ - region_t nr_regions; - int in_sync; +static void *bio_map_info_alloc(unsigned int gfp_mask, void *pool_data){ + return kmalloc(sizeof(struct bio_map_info), gfp_mask); +} - unsigned int nr_mirrors; - struct mirror mirror[0]; -}; +static void bio_map_info_free(void *element, void *pool_data){ + kfree(element); +} /* * Every mirror should look like this one. @@ -568,18 +645,18 @@ struct mirror_set { #define DEFAULT_MIRROR 0 /* - * This is yucky. We squirrel the mirror_set struct away inside - * bi_next for write buffers. This is safe since the bh + * This is yucky. We squirrel the mirror struct away inside + * bi_next for read/write buffers. This is safe since the bh * doesn't get submitted to the lower levels of block layer. */ -static struct mirror_set *bio_get_ms(struct bio *bio) +static struct mirror *bio_get_m(struct bio *bio) { - return (struct mirror_set *) bio->bi_next; + return (struct mirror *) bio->bi_next; } -static void bio_set_ms(struct bio *bio, struct mirror_set *ms) +static void bio_set_m(struct bio *bio, struct mirror *m) { - bio->bi_next = (struct bio *) ms; + bio->bi_next = (struct bio *) m; } /*----------------------------------------------------------------- @@ -589,13 +666,38 @@ static void bio_set_ms(struct bio *bio, struct mirror_set *ms) * are in the no-sync state. We have to recover these by * recopying from the default mirror to all the others. *---------------------------------------------------------------*/ +static void fail_mirror(struct mirror *m); static void recovery_complete(int read_err, unsigned int write_err, void *context) { struct region *reg = (struct region *) context; + struct mirror_set *ms = reg->rh->ms; + unsigned long write_error = write_err; + int m, bit = 0; + + if (read_err) { + /* Read error means the failure of default mirror. */ + DMERR("Unable to read from primary mirror during recovery"); + fail_mirror(ms->default_mirror); + } + + if (write_error) { + DMERR("Write error during recovery (error = %#lx)", + write_error); + /* + * Bits correspond to devices (excluding default mirror). + * The default mirror cannot change during recovery. + */ + for (m = 0; m < ms->nr_mirrors; m++) { + if (&ms->mirror[m] == ms->default_mirror) + continue; + if (test_bit(bit, &write_error)) + fail_mirror(ms->mirror + m); + bit++; + } + } - /* FIXME: better error handling */ - rh_recovery_end(reg, read_err || write_err); + rh_recovery_end(reg, !(read_err || write_err)); } static int recover(struct mirror_set *ms, struct region *reg) @@ -607,7 +709,7 @@ static int recover(struct mirror_set *ms, struct region *reg) unsigned long flags = 0; /* fill in the source */ - m = ms->mirror + DEFAULT_MIRROR; + m = ms->default_mirror; from.bdev = m->dev->bdev; from.sector = m->offset + region_to_sector(reg->rh, reg->key); if (reg->key == (ms->nr_regions - 1)) { @@ -623,7 +725,7 @@ static int recover(struct mirror_set *ms, struct region *reg) /* fill in the destinations */ for (i = 0, dest = to; i < ms->nr_mirrors; i++) { - if (i == DEFAULT_MIRROR) + if (&ms->mirror[i] == ms->default_mirror) continue; m = ms->mirror + i; @@ -634,7 +736,9 @@ static int recover(struct mirror_set *ms, struct region *reg) } /* hand to kcopyd */ - set_bit(KCOPYD_IGNORE_ERROR, &flags); + if (ms->rh.log->type->get_failure_response(ms->rh.log) == DMLOG_IOERR_IGNORE) + set_bit(KCOPYD_IGNORE_ERROR, &flags); + r = kcopyd_copy(ms->kcopyd_client, &from, ms->nr_mirrors - 1, to, flags, recovery_complete, reg); @@ -662,53 +766,212 @@ static void do_recovery(struct mirror_set *ms) } /* - * Update the in sync flag. + * Update the in sync flag if necessary. + * Raise an event when the mirror becomes in-sync. + * + * After recovery completes, the mirror becomes in_sync. + * Only an I/O failure can then take it back out-of-sync. */ - if (!ms->in_sync && - (log->type->get_sync_count(log) == ms->nr_regions)) { - /* the sync is complete */ - dm_table_event(ms->ti->table); - ms->in_sync = 1; - } + if (log->type->get_sync_count(log) == ms->nr_regions) { + if (!ms->in_sync) { + dm_table_event(ms->ti->table); + ms->in_sync = 1; + } + } else if (ms->in_sync) + ms->in_sync = 0; } /*----------------------------------------------------------------- * Reads *---------------------------------------------------------------*/ -static struct mirror *choose_mirror(struct mirror_set *ms, sector_t sector) +/* Switch to next dev, via round-robin, after MIN_READS reads */ +#define MIN_READS 128 + +/* choose_mirror + * @ms: the mirror set + * + * This function is used for read balancing. + * + * Returns: chosen mirror, or NULL on failure + */ +static struct mirror *choose_mirror(struct mirror_set *ms) +{ + struct mirror *start_mirror = ms->read_mirror; + + /* + * Perform MIN_READS on each working mirror then + * advance to the next one. start_mirror stores + * the first we tried, so we know when we're done. + */ + do { + if (likely(!atomic_read(&ms->read_mirror->error_count)) && + !atomic_dec_and_test(&ms->read_count)) + goto use_mirror; + + atomic_set(&ms->read_count, MIN_READS); + + if (ms->read_mirror-- == ms->mirror) + ms->read_mirror += ms->nr_mirrors; + } while (ms->read_mirror != start_mirror); + + /* + * We've rejected every mirror. + * Confirm the start_mirror can be used. + */ + if (unlikely(atomic_read(&ms->read_mirror->error_count))) + return NULL; + +use_mirror: + return ms->read_mirror; +} + +/* fail_mirror + * @m: mirror device to fail + * + * If the device is valid, mark it invalid. Also, + * if this is the default mirror device (i.e. the primary + * device) and the mirror set is in-sync, choose an + * alternate primary device. + * + * This function cannot block. + */ +static void fail_mirror(struct mirror *m) +{ + struct mirror_set *ms = m->ms; + struct mirror *new; + + atomic_inc(&m->error_count); + + if (atomic_read(&m->error_count) > 1) + return; + + if (m != ms->default_mirror) + return; + + /* + * If the default mirror fails, change it. + * In the case of cluster mirroring, the default + * is changed in rh_update_states. + */ + if (!ms->in_sync) { + /* + * Can not switch primary. Better to issue requests + * to same failing device than to risk returning + * corrupt data. + */ + DMERR("Primary mirror device has failed while mirror is not in-sync"); + DMERR("Unable to choose alternative primary device"); + return; + } + + for (new = ms->mirror; new < ms->mirror + ms->nr_mirrors; new++) + if (!atomic_read(&new->error_count)) { + ms->default_mirror = new; + break; + } + + if (unlikely(new == ms->mirror + ms->nr_mirrors)) + DMWARN("All sides of mirror have failed."); +} + +static int default_ok(struct mirror *m) +{ + return !atomic_read(&m->ms->default_mirror->error_count); +} + +static int mirror_available(struct mirror_set *ms, struct bio *bio) { - /* FIXME: add read balancing */ - return ms->mirror + DEFAULT_MIRROR; + region_t region = bio_to_region(&ms->rh, bio); + + if (ms->rh.log->type->in_sync(ms->rh.log, region, 0) > 0) + return choose_mirror(ms) ? 1 : 0; + + return 0; } /* * remap a buffer to a particular mirror. */ -static void map_bio(struct mirror_set *ms, struct mirror *m, struct bio *bio) +static sector_t map_sector(struct mirror *m, struct bio *bio) +{ + return m->offset + (bio->bi_sector - m->ms->ti->begin); +} + +static void map_bio(struct mirror *m, struct bio *bio) { bio->bi_bdev = m->dev->bdev; - bio->bi_sector = m->offset + (bio->bi_sector - ms->ti->begin); + bio->bi_sector = map_sector(m, bio); +} + +static void map_region(struct io_region *io, struct mirror *m, + struct bio *bio) +{ + io->bdev = m->dev->bdev; + io->sector = map_sector(m, bio); + io->count = bio->bi_size >> 9; +} + +/*----------------------------------------------------------------- + * Reads + *---------------------------------------------------------------*/ +static void read_callback(unsigned long error, void *context) +{ + struct bio *bio = (struct bio *)context; + struct mirror *m; + + m = bio_get_m(bio); + bio_set_m(bio, NULL); + + if (unlikely(error)) { + DMWARN("A read failure occurred on a mirror device."); + fail_mirror(m); + if (likely(default_ok(m)) || mirror_available(m->ms, bio)) { + DMWARN("Trying different device."); + queue_bio(m->ms, bio, bio_rw(bio)); + } else { + DMERR("No other device available, failing I/O."); + bio_endio(bio, bio->bi_size, -EIO); + } + } else + bio_endio(bio, bio->bi_size, 0); +} + +/* Asynchronous read. */ +static void read_async_bio(struct mirror *m, struct bio *bio) +{ + struct io_region io; + + map_region(&io, m, bio); + bio_set_m(bio, m); + dm_io_async_bvec(1, &io, READ, + bio->bi_io_vec + bio->bi_idx, + read_callback, bio); } static void do_reads(struct mirror_set *ms, struct bio_list *reads) { - region_t region; struct bio *bio; struct mirror *m; while ((bio = bio_list_pop(reads))) { - region = bio_to_region(&ms->rh, bio); - /* * We can only read balance if the region is in sync. */ - if (rh_in_sync(&ms->rh, region, 0)) - m = choose_mirror(ms, bio->bi_sector); - else - m = ms->mirror + DEFAULT_MIRROR; + if (likely(rh_in_sync(&ms->rh, + bio_to_region(&ms->rh, bio), 0))) + m = choose_mirror(ms); + else { + m = ms->default_mirror; + + /* If default has failed, we give up. */ + if (unlikely(m && atomic_read(&m->error_count))) + m = NULL; + } - map_bio(ms, m, bio); - generic_make_request(bio); + if (likely(m)) + read_async_bio(m, bio); + else + bio_endio(bio, bio->bi_size, -EIO); } } @@ -722,15 +985,70 @@ static void do_reads(struct mirror_set *ms, struct bio_list *reads) * RECOVERING: delay the io until recovery completes * NOSYNC: increment pending, just write to the default mirror *---------------------------------------------------------------*/ -static void write_callback(unsigned long error, void *context) + +/* __bio_mark_nosync + * @ms + * @bio + * @done + * @error + * + * The bio was written on some mirror(s) but failed on other mirror(s). + * We can successfully endio the bio but should avoid the region being + * marked clean by setting the state RH_NOSYNC. + * + * This function is _not_ interrupt safe! + */ +static void __bio_mark_nosync(struct mirror_set *ms, + struct bio *bio, unsigned int done, int error) { - unsigned int i; - int uptodate = 1; + unsigned long flags; + struct region_hash *rh = &ms->rh; + struct dirty_log *log = ms->rh.log; + struct region *reg; + region_t region = bio_to_region(rh, bio); + int recovering = 0; + + ms->in_sync = 0; + + /* We must inform the log that the sync count has changed. */ + log->type->set_region_sync(log, region, 0); + + read_lock(&rh->hash_lock); + reg = __rh_find(rh, region); + read_unlock(&rh->hash_lock); + + /* region hash entry should exist because write was in-flight */ + BUG_ON(!reg); + BUG_ON(!list_empty(®->list)); + + spin_lock_irqsave(&rh->region_lock, flags); + /* + * Possible cases: + * 1) RH_DIRTY + * 2) RH_NOSYNC: was dirty, other preceeding writes failed + * 3) RH_RECOVERING: flushing pending writes + * Either case, the region should have not been connected to list. + */ + recovering = (reg->state == RH_RECOVERING); + reg->state = RH_NOSYNC; + BUG_ON(!list_empty(®->list)); + spin_unlock_irqrestore(&rh->region_lock, flags); + + bio_endio(bio, done, error); + if (recovering) + complete_resync_work(reg, 0); +} + +static void write_callback(unsigned long error, void *context, int log_failure) +{ + unsigned int i, ret = 0; struct bio *bio = (struct bio *) context; struct mirror_set *ms; + int uptodate = 0; + int should_wake = 0; - ms = bio_get_ms(bio); - bio_set_ms(bio, NULL); + ms = (bio_get_m(bio))->ms; + bio_set_m(bio, NULL); /* * NOTE: We don't decrement the pending count here, @@ -738,47 +1056,95 @@ static void write_callback(unsigned long error, void *context) * This way we handle both writes to SYNC and NOSYNC * regions with the same code. */ + if (unlikely(error)) { + DMERR("Error during write occurred."); - if (error) { /* - * only error the io if all mirrors failed. - * FIXME: bogus + * If the log is intact, we can play around with trying + * to handle the failure. Otherwise, we have to report + * the I/O as failed. */ - uptodate = 0; - for (i = 0; i < ms->nr_mirrors; i++) - if (!test_bit(i, &error)) { - uptodate = 1; - break; + if (!log_failure) { + for (i = 0; i < ms->nr_mirrors; i++) { + if (test_bit(i, &error)) + fail_mirror(ms->mirror + i); + else + uptodate = 1; } + } + + if (likely(uptodate)) { + /* + * Need to raise event. Since raising + * events can block, we need to do it in + * the main thread. + */ + spin_lock(&ms->lock); + if (!ms->failures.head) + should_wake = 1; + bio_list_add(&ms->failures, bio); + spin_unlock(&ms->lock); + if (should_wake) + wake(); + return; + } else { + DMERR("All replicated volumes dead, failing I/O"); + /* None of the writes succeeded, fail the I/O. */ + ret = -EIO; + } } - bio_endio(bio, bio->bi_size, 0); + + bio_endio(bio, bio->bi_size, ret); +} + +static void write_callback_good_log(unsigned long error, void *context) +{ + write_callback(error, context, 0); } -static void do_write(struct mirror_set *ms, struct bio *bio) +static void write_callback_bad_log(unsigned long error, void *context) +{ + write_callback(error, context, 1); +} + +static void do_write(struct mirror_set *ms, struct bio *bio, int log_failure) { unsigned int i; - struct io_region io[KCOPYD_MAX_REGIONS+1]; + struct io_region io[ms->nr_mirrors], *dest = io; struct mirror *m; - for (i = 0; i < ms->nr_mirrors; i++) { - m = ms->mirror + i; - - io[i].bdev = m->dev->bdev; - io[i].sector = m->offset + (bio->bi_sector - ms->ti->begin); - io[i].count = bio->bi_size >> 9; + if (log_failure && dm_mirror_error_on_log_failure) { + bio_endio(bio, bio->bi_size, -EIO); + return; } - bio_set_ms(bio, ms); - dm_io_async_bvec(ms->nr_mirrors, io, WRITE, - bio->bi_io_vec + bio->bi_idx, - write_callback, bio); + for (i = 0, m = ms->mirror; i < ms->nr_mirrors; i++, m++) + map_region(dest++, m, bio); + + /* + * We can use the default mirror here, because we + * only need it in order to retrieve the reference + * to the mirror set in write_callback(). + */ + bio_set_m(bio, ms->default_mirror); + if (log_failure) + dm_io_async_bvec(ms->nr_mirrors, io, WRITE, + bio->bi_io_vec + bio->bi_idx, + write_callback_bad_log, bio); + else + dm_io_async_bvec(ms->nr_mirrors, io, WRITE, + bio->bi_io_vec + bio->bi_idx, + write_callback_good_log, bio); } static void do_writes(struct mirror_set *ms, struct bio_list *writes) { - int state; + int state, r; struct bio *bio; struct bio_list sync, nosync, recover, *this_list = NULL; + struct bio_list requeue; + struct dirty_log *log = ms->rh.log; + region_t region; if (!writes->head) return; @@ -789,9 +1155,18 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes) bio_list_init(&sync); bio_list_init(&nosync); bio_list_init(&recover); + bio_list_init(&requeue); while ((bio = bio_list_pop(writes))) { - state = rh_state(&ms->rh, bio_to_region(&ms->rh, bio), 1); + region = bio_to_region(&ms->rh, bio); + + if (log->type->is_remote_recovering && + log->type->is_remote_recovering(log, region)) { + bio_list_add(&requeue, bio); + continue; + } + + state = rh_state(&ms->rh, region, 1); switch (state) { case RH_CLEAN: case RH_DIRTY: @@ -810,6 +1185,14 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes) bio_list_add(this_list, bio); } + /* + * Add bios that are delayed due to remote recovery + * back on to the write queue + */ + spin_lock_irq(&ms->lock); + bio_list_merge(&ms->writes, &requeue); + spin_unlock_irq(&ms->lock); + /* * Increment the pending counts for any regions that will * be written to (writes to recover regions are going to @@ -817,54 +1200,86 @@ static void do_writes(struct mirror_set *ms, struct bio_list *writes) */ rh_inc_pending(&ms->rh, &sync); rh_inc_pending(&ms->rh, &nosync); - rh_flush(&ms->rh); + + r = rh_flush(&ms->rh); /* * Dispatch io. */ while ((bio = bio_list_pop(&sync))) - do_write(ms, bio); + do_write(ms, bio, r ? 1 : 0); while ((bio = bio_list_pop(&recover))) rh_delay(&ms->rh, bio); while ((bio = bio_list_pop(&nosync))) { - map_bio(ms, ms->mirror + DEFAULT_MIRROR, bio); + map_bio(ms->default_mirror, bio); generic_make_request(bio); } } +static void do_failures(struct mirror_set *ms, struct bio_list *failures) +{ + struct bio *bio; + struct dirty_log *log = ms->rh.log; + + if (!failures->head) + return; + + if (log->type->get_failure_response(log) == DMLOG_IOERR_BLOCK) + dm_table_event(ms->ti->table); + + while ((bio = bio_list_pop(failures))) + __bio_mark_nosync(ms, bio, bio->bi_size, 0); +} + /*----------------------------------------------------------------- * kmirrord *---------------------------------------------------------------*/ static LIST_HEAD(_mirror_sets); static DECLARE_RWSEM(_mirror_sets_lock); -static void do_mirror(struct mirror_set *ms) +static int do_mirror(struct mirror_set *ms) { - struct bio_list reads, writes; + struct bio_list reads, writes, failures; - spin_lock(&ms->lock); + spin_lock_irq(&ms->lock); reads = ms->reads; writes = ms->writes; + failures = ms->failures; bio_list_init(&ms->reads); bio_list_init(&ms->writes); - spin_unlock(&ms->lock); + bio_list_init(&ms->failures); + spin_unlock_irq(&ms->lock); rh_update_states(&ms->rh); do_recovery(ms); do_reads(ms, &reads); do_writes(ms, &writes); + do_failures(ms, &failures); + + return (ms->writes.head) ? 1 : 0; } -static void do_work(void *ignored) +static int _do_work(void) { + int more_work = 0; struct mirror_set *ms; down_read(&_mirror_sets_lock); list_for_each_entry (ms, &_mirror_sets, list) - do_mirror(ms); + more_work += do_mirror(ms); up_read(&_mirror_sets_lock); + + return more_work; +} + +static void do_work(void *ignored) +{ + while (_do_work()) { + set_current_state(TASK_INTERRUPTIBLE); + schedule_timeout(HZ/5); + } } /*----------------------------------------------------------------- @@ -885,7 +1300,7 @@ static struct mirror_set *alloc_context(unsigned int nr_mirrors, ms = kmalloc(len, GFP_KERNEL); if (!ms) { - ti->error = "dm-mirror: Cannot allocate mirror context"; + ti->error = "Cannot allocate mirror context"; return NULL; } @@ -896,13 +1311,19 @@ static struct mirror_set *alloc_context(unsigned int nr_mirrors, ms->nr_mirrors = nr_mirrors; ms->nr_regions = dm_sector_div_up(ti->len, region_size); ms->in_sync = 0; + ms->read_mirror = &ms->mirror[DEFAULT_MIRROR]; + ms->default_mirror = &ms->mirror[DEFAULT_MIRROR]; if (rh_init(&ms->rh, ms, dl, region_size, ms->nr_regions)) { - ti->error = "dm-mirror: Error creating dirty region hash"; + ti->error = "Error creating dirty region hash"; kfree(ms); return NULL; } + atomic_set(&ms->read_count, MIN_READS); + + bio_list_init(&ms->failures); + return ms; } @@ -925,21 +1346,23 @@ static inline int _check_region_size(struct dm_target *ti, uint32_t size) static int get_mirror(struct mirror_set *ms, struct dm_target *ti, unsigned int mirror, char **argv) { - sector_t offset; + unsigned long long offset; - if (sscanf(argv[1], SECTOR_FORMAT, &offset) != 1) { - ti->error = "dm-mirror: Invalid offset"; + if (sscanf(argv[1], "%llu", &offset) != 1) { + ti->error = "Invalid offset"; return -EINVAL; } if (dm_get_device(ti, argv[0], offset, ti->len, dm_table_get_mode(ti->table), &ms->mirror[mirror].dev)) { - ti->error = "dm-mirror: Device lookup failure"; + ti->error = "Device lookup failure"; return -ENXIO; } ms->mirror[mirror].offset = offset; + atomic_set(&(ms->mirror[mirror].error_count), 0); + ms->mirror[mirror].ms = ms; return 0; } @@ -972,30 +1395,30 @@ static struct dirty_log *create_dirty_log(struct dm_target *ti, struct dirty_log *dl; if (argc < 2) { - ti->error = "dm-mirror: Insufficient mirror log arguments"; + ti->error = "Insufficient mirror log arguments"; return NULL; } if (sscanf(argv[1], "%u", ¶m_count) != 1) { - ti->error = "dm-mirror: Invalid mirror log argument count"; + ti->error = "Invalid mirror log argument count"; return NULL; } *args_used = 2 + param_count; if (argc < *args_used) { - ti->error = "dm-mirror: Insufficient mirror log arguments"; + ti->error = "Insufficient mirror log arguments"; return NULL; } dl = dm_create_dirty_log(argv[0], ti, param_count, argv + 2); if (!dl) { - ti->error = "dm-mirror: Error creating mirror dirty log"; + ti->error = "Error creating mirror dirty log"; return NULL; } if (!_check_region_size(ti, dl->type->get_region_size(dl))) { - ti->error = "dm-mirror: Invalid region size"; + ti->error = "Invalid region size"; dm_destroy_dirty_log(dl); return NULL; } @@ -1029,7 +1452,7 @@ static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv) if (!argc || sscanf(argv[0], "%u", &nr_mirrors) != 1 || nr_mirrors < 2 || nr_mirrors > KCOPYD_MAX_REGIONS + 1) { - ti->error = "dm-mirror: Invalid number of mirrors"; + ti->error = "Invalid number of mirrors"; dm_destroy_dirty_log(dl); return -EINVAL; } @@ -1037,7 +1460,7 @@ static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv) argv++, argc--; if (argc != nr_mirrors * 2) { - ti->error = "dm-mirror: Wrong number of mirror arguments"; + ti->error = "Wrong number of mirror arguments"; dm_destroy_dirty_log(dl); return -EINVAL; } @@ -1060,6 +1483,7 @@ static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv) } ti->private = ms; + ti->split_io = ms->rh.region_size; r = kcopyd_client_create(DM_IO_PAGES, &ms->kcopyd_client); if (r) { @@ -1082,14 +1506,15 @@ static void mirror_dtr(struct dm_target *ti) static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw) { + unsigned long flags; int should_wake = 0; struct bio_list *bl; bl = (rw == WRITE) ? &ms->writes : &ms->reads; - spin_lock(&ms->lock); + spin_lock_irqsave(&ms->lock, flags); should_wake = !(bl->head); bio_list_add(bl, bio); - spin_unlock(&ms->lock); + spin_unlock_irqrestore(&ms->lock, flags); if (should_wake) wake(); @@ -1104,42 +1529,64 @@ static int mirror_map(struct dm_target *ti, struct bio *bio, int r, rw = bio_rw(bio); struct mirror *m; struct mirror_set *ms = ti->private; - - map_context->ll = bio->bi_sector >> ms->rh.region_shift; + struct bio_map_info *bmi = NULL; + struct dm_bio_details *bd = NULL; if (rw == WRITE) { + /* Save region for mirror_end_io() handler */ + map_context->ll = bio_to_region(&ms->rh, bio); queue_bio(ms, bio, rw); return 0; } + /* All about the reads now */ + r = ms->rh.log->type->in_sync(ms->rh.log, bio_to_region(&ms->rh, bio), 0); if (r < 0 && r != -EWOULDBLOCK) return r; - if (r == -EWOULDBLOCK) /* FIXME: ugly */ + if (r == -EWOULDBLOCK) r = 0; - /* - * We don't want to fast track a recovery just for a read - * ahead. So we just let it silently fail. - * FIXME: get rid of this. - */ - if (!r && rw == READA) - return -EIO; + if (likely(r)) { + /* + * Optimize reads by avoiding to hand them to daemon. + * + * In case they fail, queue them for another shot + * in the mirror_end_io() function. + */ + m = choose_mirror(ms); + if (likely(m)) { + bmi = mempool_alloc(bio_map_info_pool, GFP_NOIO); + + if (likely(bmi)) { + /* without this, a read is not retryable */ + bd = &bmi->bmi_bd; + dm_bio_record(bd, bio); + map_context->ptr = bmi; + bmi->bmi_m = m; + } else { + /* we could fail now, but we can at least ** + ** give it a shot. The bd is only used to ** + ** retry in the event of a failure anyway. ** + ** If we fail, we can fail the I/O then. */ + map_context->ptr = NULL; + } + + map_bio(m, bio); + return 1; /* Mapped -> queue request. */ + } else + return -EIO; + } else { + /* Either not clean, or -EWOULDBLOCK */ + if (rw == READA) + return -EWOULDBLOCK; - if (!r) { - /* Pass this io over to the daemon */ queue_bio(ms, bio, rw); - return 0; } - m = choose_mirror(ms, bio->bi_sector); - if (!m) - return -EIO; - - map_bio(ms, m, bio); - return 1; + return 0; } static int mirror_end_io(struct dm_target *ti, struct bio *bio, @@ -1147,15 +1594,71 @@ static int mirror_end_io(struct dm_target *ti, struct bio *bio, { int rw = bio_rw(bio); struct mirror_set *ms = (struct mirror_set *) ti->private; - region_t region = map_context->ll; + struct mirror *m = NULL; + struct dm_bio_details *bd = NULL; /* * We need to dec pending if this was a write. */ - if (rw == WRITE) - rh_dec(&ms->rh, region); + if (rw == WRITE) { + rh_dec(&ms->rh, map_context->ll); + return error; + } - return 0; + if (error == -EOPNOTSUPP) + goto out; + + if ((error == -EWOULDBLOCK) && bio_rw_ahead(bio)) + goto out; + + if (unlikely(error)) { + DMERR("A read failure occurred on a mirror device."); + if (!map_context->ptr) { + /* + * There wasn't enough memory to record necessary + * information for a retry or there was no other + * mirror in-sync. + */ + DMERR("Unable to retry read."); + return -EIO; + } + m = ((struct bio_map_info *)map_context->ptr)->bmi_m; + fail_mirror(m); /* Flag error on mirror. */ + + /* + * A failed read needs to get queued + * to the daemon for another shot to + * one (if any) intact mirrors. + */ + if (default_ok(m) || mirror_available(ms, bio)) { + bd = &(((struct bio_map_info *)map_context->ptr)->bmi_bd + ); + + DMWARN("Trying different device."); + dm_bio_restore(bd, bio); + mempool_free(map_context->ptr, bio_map_info_pool); + map_context->ptr = NULL; + queue_bio(ms, bio, rw); + return 1; /* We want another shot on the bio. */ + } + DMERR("All replicated volumes dead, failing I/O"); + } + +out: + if (map_context->ptr) + mempool_free(map_context->ptr, bio_map_info_pool); + + return error; +} + +static void mirror_presuspend(struct dm_target *ti) +{ + struct mirror_set *ms = (struct mirror_set *) ti->private; + struct dirty_log *log = ms->rh.log; + + if (log->type->presuspend && log->type->presuspend(log)) + /* FIXME: need better error handling */ + DMWARN("log presuspend failed"); } static void mirror_postsuspend(struct dm_target *ti) @@ -1164,9 +1667,14 @@ static void mirror_postsuspend(struct dm_target *ti) struct dirty_log *log = ms->rh.log; rh_stop_recovery(&ms->rh); - if (log->type->suspend && log->type->suspend(log)) + + /* Wait for all I/O we generated to complete */ + wait_event(recovery_stopped_event, + !atomic_read(&ms->rh.recovery_in_flight)); + + if (log->type->postsuspend && log->type->postsuspend(log)) /* FIXME: need better error handling */ - DMWARN("log suspend failed"); + DMWARN("log postsuspend failed"); } static void mirror_resume(struct dm_target *ti) @@ -1182,27 +1690,32 @@ static void mirror_resume(struct dm_target *ti) static int mirror_status(struct dm_target *ti, status_type_t type, char *result, unsigned int maxlen) { - unsigned int m, sz; + unsigned int m, sz = 0; struct mirror_set *ms = (struct mirror_set *) ti->private; - - sz = ms->rh.log->type->status(ms->rh.log, type, result, maxlen); + char buffer[ms->nr_mirrors + 1]; switch (type) { case STATUSTYPE_INFO: DMEMIT("%d ", ms->nr_mirrors); - for (m = 0; m < ms->nr_mirrors; m++) + for (m = 0; m < ms->nr_mirrors; m++) { DMEMIT("%s ", ms->mirror[m].dev->name); + buffer[m] = atomic_read(&(ms->mirror[m].error_count)) ? + 'D' : 'A'; + } + buffer[m] = '\0'; - DMEMIT(SECTOR_FORMAT "/" SECTOR_FORMAT, + DMEMIT("%llu/%llu 1 %s ", ms->rh.log->type->get_sync_count(ms->rh.log), - ms->nr_regions); + ms->nr_regions, buffer); + ms->rh.log->type->status(ms->rh.log, type, result+sz, maxlen-sz); break; case STATUSTYPE_TABLE: + sz = ms->rh.log->type->status(ms->rh.log, type, result, maxlen); DMEMIT("%d ", ms->nr_mirrors); for (m = 0; m < ms->nr_mirrors; m++) - DMEMIT("%s " SECTOR_FORMAT " ", - ms->mirror[m].dev->name, ms->mirror[m].offset); + DMEMIT("%s %llu ", ms->mirror[m].dev->name, + (unsigned long long)ms->mirror[m].offset); } return 0; @@ -1210,12 +1723,13 @@ static int mirror_status(struct dm_target *ti, status_type_t type, static struct target_type mirror_target = { .name = "mirror", - .version = {1, 0, 1}, + .version = {1, 2, 0}, .module = THIS_MODULE, .ctr = mirror_ctr, .dtr = mirror_dtr, .map = mirror_map, .end_io = mirror_end_io, + .presuspend = mirror_presuspend, .postsuspend = mirror_postsuspend, .resume = mirror_resume, .status = mirror_status, @@ -1225,15 +1739,20 @@ static int __init dm_mirror_init(void) { int r; + bio_map_info_pool = mempool_create(100, bio_map_info_alloc, + bio_map_info_free, NULL); + if (!bio_map_info_pool) + return -ENOMEM; + r = dm_dirty_log_init(); if (r) return r; - _kmirrord_wq = create_workqueue("kmirrord"); + _kmirrord_wq = create_singlethread_workqueue("kmirrord"); if (!_kmirrord_wq) { DMERR("couldn't start kmirrord"); dm_dirty_log_exit(); - return r; + return -ENOMEM; } INIT_WORK(&_kmirrord_work, do_work, NULL); @@ -1243,6 +1762,15 @@ static int __init dm_mirror_init(void) mirror_target.name); dm_dirty_log_exit(); destroy_workqueue(_kmirrord_wq); + } else if (!dm_mirror_error_on_log_failure) { + DMWARN("Warning: dm_mirror_error_on_log_failure = 0"); + DMWARN("In this mode, the following fault sequence could cause corruption:"); + DMWARN(" 1) Log device failure"); + DMWARN(" 2) Write I/O issued"); + DMWARN(" 3) Machine failure"); + DMWARN(" 4) Log device restored"); + DMWARN(" 5) Machine reboots"); + DMWARN("If this happens, you must resync your mirror."); } return r; @@ -1264,6 +1792,8 @@ static void __exit dm_mirror_exit(void) module_init(dm_mirror_init); module_exit(dm_mirror_exit); +module_param(dm_mirror_error_on_log_failure, int, 1); +MODULE_PARM_DESC(dm_mirror_error_on_log_failure, "Set to '0' if you want writes to succeed on log device failure"); MODULE_DESCRIPTION(DM_NAME " mirror target"); MODULE_AUTHOR("Joe Thornber"); MODULE_LICENSE("GPL");