2 * Copyright (C) 2003 Sistina Software Limited.
4 * This file is released under the GPL.
8 #include "dm-bio-list.h"
9 #include "dm-bio-record.h"
14 #include <linux/ctype.h>
15 #include <linux/init.h>
16 #include <linux/mempool.h>
17 #include <linux/module.h>
18 #include <linux/pagemap.h>
19 #include <linux/slab.h>
20 #include <linux/time.h>
21 #include <linux/vmalloc.h>
22 #include <linux/workqueue.h>
24 #define DM_MSG_PREFIX "raid1"
26 static struct workqueue_struct *_kmirrord_wq;
27 static struct work_struct _kmirrord_work;
28 DECLARE_WAIT_QUEUE_HEAD(recovery_stopped_event);
30 static int dm_mirror_error_on_log_failure = 1;
32 static inline void wake(void)
34 queue_work(_kmirrord_wq, &_kmirrord_work);
37 /*-----------------------------------------------------------------
40 * The mirror splits itself up into discrete regions. Each
41 * region can be in one of three states: clean, dirty,
42 * nosync. There is no need to put clean regions in the hash.
44 * In addition to being present in the hash table a region _may_
45 * be present on one of three lists.
47 * clean_regions: Regions on this list have no io pending to
48 * them, they are in sync, we are no longer interested in them,
49 * they are dull. rh_update_states() will remove them from the
52 * quiesced_regions: These regions have been spun down, ready
53 * for recovery. rh_recovery_start() will remove regions from
54 * this list and hand them to kmirrord, which will schedule the
55 * recovery io with kcopyd.
57 * recovered_regions: Regions that kcopyd has successfully
58 * recovered. rh_update_states() will now schedule any delayed
59 * io, up the recovery_count, and remove the region from the
63 * A rw spin lock 'hash_lock' protects just the hash table,
64 * this is never held in write mode from interrupt context,
65 * which I believe means that we only have to disable irqs when
68 * An ordinary spin lock 'region_lock' that protects the three
69 * lists in the region_hash, with the 'state', 'list' and
70 * 'bhs_delayed' fields of the regions. This is used from irq
71 * context, so all other uses will have to suspend local irqs.
72 *---------------------------------------------------------------*/
75 struct mirror_set *ms;
77 unsigned region_shift;
79 /* holds persistent region state */
80 struct dirty_log *log;
84 mempool_t *region_pool;
86 unsigned int nr_buckets;
87 struct list_head *buckets;
89 spinlock_t region_lock;
90 atomic_t recovery_in_flight;
91 struct semaphore recovery_count;
92 struct list_head clean_regions;
93 struct list_head quiesced_regions;
94 struct list_head recovered_regions;
95 struct list_head failed_recovered_regions;
106 struct region_hash *rh; /* FIXME: can we get rid of this ? */
110 struct list_head hash_list;
111 struct list_head list;
114 struct bio_list delayed_bios;
118 /*-----------------------------------------------------------------
119 * Mirror set structures.
120 *---------------------------------------------------------------*/
122 atomic_t error_count; /* Error counter to flag mirror failure */
123 struct mirror_set *ms;
129 struct dm_target *ti;
130 struct list_head list;
131 struct region_hash rh;
132 struct kcopyd_client *kcopyd_client;
134 spinlock_t lock; /* protects the lists */
135 struct bio_list reads;
136 struct bio_list writes;
137 struct bio_list failures;
143 struct mirror *default_mirror; /* Default mirror */
145 unsigned int nr_mirrors;
146 atomic_t read_count; /* Read counter for read balancing */
147 struct mirror *read_mirror; /* Last mirror read. */
148 struct mirror mirror[0];
154 static inline region_t bio_to_region(struct region_hash *rh, struct bio *bio)
156 return (bio->bi_sector - rh->ms->ti->begin) >> rh->region_shift;
159 static inline sector_t region_to_sector(struct region_hash *rh, region_t region)
161 return region << rh->region_shift;
164 /* FIXME move this */
165 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw);
167 #define MIN_REGIONS 64
168 #define MAX_RECOVERY 1
169 static int rh_init(struct region_hash *rh, struct mirror_set *ms,
170 struct dirty_log *log, uint32_t region_size,
173 unsigned int nr_buckets, max_buckets;
177 * Calculate a suitable number of buckets for our hash
180 max_buckets = nr_regions >> 6;
181 for (nr_buckets = 128u; nr_buckets < max_buckets; nr_buckets <<= 1)
187 rh->region_size = region_size;
188 rh->region_shift = ffs(region_size) - 1;
189 rwlock_init(&rh->hash_lock);
190 rh->mask = nr_buckets - 1;
191 rh->nr_buckets = nr_buckets;
193 rh->buckets = vmalloc(nr_buckets * sizeof(*rh->buckets));
195 DMERR("unable to allocate region hash memory");
199 for (i = 0; i < nr_buckets; i++)
200 INIT_LIST_HEAD(rh->buckets + i);
202 spin_lock_init(&rh->region_lock);
203 sema_init(&rh->recovery_count, 0);
204 atomic_set(&rh->recovery_in_flight, 0);
205 INIT_LIST_HEAD(&rh->clean_regions);
206 INIT_LIST_HEAD(&rh->quiesced_regions);
207 INIT_LIST_HEAD(&rh->recovered_regions);
208 INIT_LIST_HEAD(&rh->failed_recovered_regions);
210 rh->region_pool = mempool_create_kmalloc_pool(MIN_REGIONS,
211 sizeof(struct region));
212 if (!rh->region_pool) {
221 static void rh_exit(struct region_hash *rh)
224 struct region *reg, *nreg;
226 BUG_ON(!list_empty(&rh->quiesced_regions));
227 for (h = 0; h < rh->nr_buckets; h++) {
228 list_for_each_entry_safe(reg, nreg, rh->buckets + h, hash_list) {
229 BUG_ON(atomic_read(®->pending));
230 mempool_free(reg, rh->region_pool);
235 dm_destroy_dirty_log(rh->log);
237 mempool_destroy(rh->region_pool);
241 #define RH_HASH_MULT 2654435387U
243 static inline unsigned int rh_hash(struct region_hash *rh, region_t region)
245 return (unsigned int) ((region * RH_HASH_MULT) >> 12) & rh->mask;
248 static struct region *__rh_lookup(struct region_hash *rh, region_t region)
252 list_for_each_entry (reg, rh->buckets + rh_hash(rh, region), hash_list)
253 if (reg->key == region)
259 static void __rh_insert(struct region_hash *rh, struct region *reg)
261 unsigned int h = rh_hash(rh, reg->key);
262 list_add(®->hash_list, rh->buckets + h);
265 static struct region *__rh_alloc(struct region_hash *rh, region_t region)
267 struct region *reg, *nreg;
269 read_unlock(&rh->hash_lock);
270 nreg = mempool_alloc(rh->region_pool, GFP_ATOMIC);
272 nreg = kmalloc(sizeof(struct region), GFP_NOIO);
273 nreg->state = rh->log->type->in_sync(rh->log, region, 1) ?
274 RH_CLEAN : RH_NOSYNC;
278 INIT_LIST_HEAD(&nreg->list);
280 atomic_set(&nreg->pending, 0);
281 bio_list_init(&nreg->delayed_bios);
282 write_lock_irq(&rh->hash_lock);
284 reg = __rh_lookup(rh, region);
286 /* we lost the race */
287 mempool_free(nreg, rh->region_pool);
290 __rh_insert(rh, nreg);
291 if (nreg->state == RH_CLEAN) {
292 spin_lock(&rh->region_lock);
293 list_add(&nreg->list, &rh->clean_regions);
294 spin_unlock(&rh->region_lock);
298 write_unlock_irq(&rh->hash_lock);
299 read_lock(&rh->hash_lock);
304 static inline struct region *__rh_find(struct region_hash *rh, region_t region)
308 reg = __rh_lookup(rh, region);
310 reg = __rh_alloc(rh, region);
315 static int rh_state(struct region_hash *rh, region_t region, int may_block)
320 read_lock(&rh->hash_lock);
321 reg = __rh_lookup(rh, region);
322 read_unlock(&rh->hash_lock);
328 * The region wasn't in the hash, so we fall back to the
331 r = rh->log->type->in_sync(rh->log, region, may_block);
334 * Any error from the dirty log (eg. -EWOULDBLOCK) gets
335 * taken as a RH_NOSYNC
337 return r == 1 ? RH_CLEAN : RH_NOSYNC;
340 static inline int rh_in_sync(struct region_hash *rh,
341 region_t region, int may_block)
343 int state = rh_state(rh, region, may_block);
344 return state == RH_CLEAN || state == RH_DIRTY;
347 static void dispatch_bios(struct mirror_set *ms, struct bio_list *bio_list)
351 while ((bio = bio_list_pop(bio_list))) {
352 queue_bio(ms, bio, WRITE);
356 static void complete_resync_work(struct region *reg, int success)
358 struct region_hash *rh = reg->rh;
360 rh->log->type->set_region_sync(rh->log, reg->key, success);
361 if (atomic_dec_and_test(&rh->recovery_in_flight))
362 wake_up_all(&recovery_stopped_event);
363 dispatch_bios(rh->ms, ®->delayed_bios);
364 up(&rh->recovery_count);
367 static void rh_update_states(struct region_hash *rh)
369 struct region *reg, *next;
372 LIST_HEAD(recovered);
373 LIST_HEAD(failed_recovered);
376 * Quickly grab the lists.
378 write_lock_irq(&rh->hash_lock);
379 spin_lock(&rh->region_lock);
380 if (!list_empty(&rh->clean_regions)) {
381 list_splice(&rh->clean_regions, &clean);
382 INIT_LIST_HEAD(&rh->clean_regions);
384 list_for_each_entry (reg, &clean, list) {
385 rh->log->type->clear_region(rh->log, reg->key);
386 list_del(®->hash_list);
390 if (!list_empty(&rh->recovered_regions)) {
391 list_splice(&rh->recovered_regions, &recovered);
392 INIT_LIST_HEAD(&rh->recovered_regions);
394 list_for_each_entry (reg, &recovered, list)
395 list_del(®->hash_list);
398 if (!list_empty(&rh->failed_recovered_regions)) {
399 list_splice(&rh->failed_recovered_regions, &failed_recovered);
400 INIT_LIST_HEAD(&rh->failed_recovered_regions);
402 list_for_each_entry (reg, &failed_recovered, list)
403 list_del(®->hash_list);
406 spin_unlock(&rh->region_lock);
407 write_unlock_irq(&rh->hash_lock);
410 * All the regions on the recovered and clean lists have
411 * now been pulled out of the system, so no need to do
414 list_for_each_entry_safe (reg, next, &recovered, list) {
415 rh->log->type->clear_region(rh->log, reg->key);
416 complete_resync_work(reg, 1);
417 mempool_free(reg, rh->region_pool);
420 list_for_each_entry_safe (reg, next, &failed_recovered, list) {
421 complete_resync_work(reg, 0);
422 mempool_free(reg, rh->region_pool);
425 if (!list_empty(&recovered))
426 rh->log->type->flush(rh->log);
428 list_for_each_entry_safe (reg, next, &clean, list)
429 mempool_free(reg, rh->region_pool);
432 static void rh_inc(struct region_hash *rh, region_t region)
436 read_lock(&rh->hash_lock);
437 reg = __rh_find(rh, region);
439 spin_lock_irq(&rh->region_lock);
440 atomic_inc(®->pending);
442 if (reg->state == RH_CLEAN) {
443 reg->state = RH_DIRTY;
444 list_del_init(®->list); /* take off the clean list */
445 spin_unlock_irq(&rh->region_lock);
447 rh->log->type->mark_region(rh->log, reg->key);
449 spin_unlock_irq(&rh->region_lock);
452 read_unlock(&rh->hash_lock);
455 static void rh_inc_pending(struct region_hash *rh, struct bio_list *bios)
459 for (bio = bios->head; bio; bio = bio->bi_next)
460 rh_inc(rh, bio_to_region(rh, bio));
463 static void rh_dec(struct region_hash *rh, region_t region)
469 read_lock(&rh->hash_lock);
470 reg = __rh_lookup(rh, region);
471 read_unlock(&rh->hash_lock);
473 spin_lock_irqsave(&rh->region_lock, flags);
474 if (atomic_dec_and_test(®->pending)) {
476 * There is no pending I/O for this region.
477 * We can move the region to corresponding list for next action.
478 * At this point, the region is not yet connected to any list.
480 * If the state is RH_NOSYNC, the region should be kept off
482 * The hash entry for RH_NOSYNC will remain in memory
483 * until the region is recovered or the map is reloaded.
486 /* do nothing for RH_NOSYNC */
487 if (reg->state == RH_RECOVERING) {
488 list_add_tail(®->list, &rh->quiesced_regions);
489 } else if (reg->state == RH_DIRTY) {
490 reg->state = RH_CLEAN;
491 list_add(®->list, &rh->clean_regions);
495 spin_unlock_irqrestore(&rh->region_lock, flags);
502 * Starts quiescing a region in preparation for recovery.
504 static int __rh_recovery_prepare(struct region_hash *rh)
511 * Ask the dirty log what's next.
513 r = rh->log->type->get_resync_work(rh->log, ®ion);
518 * Get this region, and start it quiescing by setting the
521 read_lock(&rh->hash_lock);
522 reg = __rh_find(rh, region);
523 read_unlock(&rh->hash_lock);
525 spin_lock_irq(&rh->region_lock);
526 reg->state = RH_RECOVERING;
528 /* Already quiesced ? */
529 if (atomic_read(®->pending))
530 list_del_init(®->list);
532 list_move(®->list, &rh->quiesced_regions);
534 spin_unlock_irq(&rh->region_lock);
539 static void rh_recovery_prepare(struct region_hash *rh)
541 /* Extra reference to avoid race with rh_stop_recovery */
542 atomic_inc(&rh->recovery_in_flight);
544 while (!down_trylock(&rh->recovery_count)) {
545 atomic_inc(&rh->recovery_in_flight);
546 if (__rh_recovery_prepare(rh) <= 0) {
547 atomic_dec(&rh->recovery_in_flight);
548 up(&rh->recovery_count);
553 /* Drop the extra reference */
554 if (atomic_dec_and_test(&rh->recovery_in_flight))
555 wake_up_all(&recovery_stopped_event);
559 * Returns any quiesced regions.
561 static struct region *rh_recovery_start(struct region_hash *rh)
563 struct region *reg = NULL;
565 spin_lock_irq(&rh->region_lock);
566 if (!list_empty(&rh->quiesced_regions)) {
567 reg = list_entry(rh->quiesced_regions.next,
568 struct region, list);
569 list_del_init(®->list); /* remove from the quiesced list */
571 spin_unlock_irq(&rh->region_lock);
576 static void rh_recovery_end(struct region *reg, int success)
578 struct region_hash *rh = reg->rh;
580 spin_lock_irq(&rh->region_lock);
582 (rh->log->type->get_failure_response(rh->log) == DMLOG_IOERR_IGNORE))
583 list_add(®->list, ®->rh->recovered_regions);
585 reg->state = RH_NOSYNC;
586 list_add(®->list, ®->rh->failed_recovered_regions);
588 spin_unlock_irq(&rh->region_lock);
593 static int rh_flush(struct region_hash *rh)
595 return rh->log->type->flush(rh->log);
598 static void rh_delay(struct region_hash *rh, struct bio *bio)
602 read_lock(&rh->hash_lock);
603 reg = __rh_find(rh, bio_to_region(rh, bio));
604 bio_list_add(®->delayed_bios, bio);
605 read_unlock(&rh->hash_lock);
608 static void rh_stop_recovery(struct region_hash *rh)
612 /* wait for any recovering regions */
613 for (i = 0; i < MAX_RECOVERY; i++)
614 down(&rh->recovery_count);
617 static void rh_start_recovery(struct region_hash *rh)
621 for (i = 0; i < MAX_RECOVERY; i++)
622 up(&rh->recovery_count);
627 struct bio_map_info {
628 struct mirror *bmi_m;
629 struct dm_bio_details bmi_bd;
632 static mempool_t *bio_map_info_pool = NULL;
634 static void *bio_map_info_alloc(unsigned int gfp_mask, void *pool_data){
635 return kmalloc(sizeof(struct bio_map_info), gfp_mask);
638 static void bio_map_info_free(void *element, void *pool_data){
643 * Every mirror should look like this one.
645 #define DEFAULT_MIRROR 0
648 * This is yucky. We squirrel the mirror struct away inside
649 * bi_next for read/write buffers. This is safe since the bh
650 * doesn't get submitted to the lower levels of block layer.
652 static struct mirror *bio_get_m(struct bio *bio)
654 return (struct mirror *) bio->bi_next;
657 static void bio_set_m(struct bio *bio, struct mirror *m)
659 bio->bi_next = (struct bio *) m;
662 /*-----------------------------------------------------------------
665 * When a mirror is first activated we may find that some regions
666 * are in the no-sync state. We have to recover these by
667 * recopying from the default mirror to all the others.
668 *---------------------------------------------------------------*/
669 static void fail_mirror(struct mirror *m);
670 static void recovery_complete(int read_err, unsigned int write_err,
673 struct region *reg = (struct region *) context;
674 struct mirror_set *ms = reg->rh->ms;
675 unsigned long write_error = write_err;
679 /* Read error means the failure of default mirror. */
680 DMERR("Unable to read from primary mirror during recovery");
681 fail_mirror(ms->default_mirror);
685 DMERR("Write error during recovery (error = %#lx)",
688 * Bits correspond to devices (excluding default mirror).
689 * The default mirror cannot change during recovery.
691 for (m = 0; m < ms->nr_mirrors; m++) {
692 if (&ms->mirror[m] == ms->default_mirror)
694 if (test_bit(bit, &write_error))
695 fail_mirror(ms->mirror + m);
700 rh_recovery_end(reg, !(read_err || write_err));
703 static int recover(struct mirror_set *ms, struct region *reg)
707 struct io_region from, to[KCOPYD_MAX_REGIONS], *dest;
709 unsigned long flags = 0;
711 /* fill in the source */
712 m = ms->default_mirror;
713 from.bdev = m->dev->bdev;
714 from.sector = m->offset + region_to_sector(reg->rh, reg->key);
715 if (reg->key == (ms->nr_regions - 1)) {
717 * The final region may be smaller than
720 from.count = ms->ti->len & (reg->rh->region_size - 1);
722 from.count = reg->rh->region_size;
724 from.count = reg->rh->region_size;
726 /* fill in the destinations */
727 for (i = 0, dest = to; i < ms->nr_mirrors; i++) {
728 if (&ms->mirror[i] == ms->default_mirror)
732 dest->bdev = m->dev->bdev;
733 dest->sector = m->offset + region_to_sector(reg->rh, reg->key);
734 dest->count = from.count;
739 if (ms->rh.log->type->get_failure_response(ms->rh.log) == DMLOG_IOERR_IGNORE)
740 set_bit(KCOPYD_IGNORE_ERROR, &flags);
742 r = kcopyd_copy(ms->kcopyd_client, &from, ms->nr_mirrors - 1, to, flags,
743 recovery_complete, reg);
748 static void do_recovery(struct mirror_set *ms)
752 struct dirty_log *log = ms->rh.log;
755 * Start quiescing some regions.
757 rh_recovery_prepare(&ms->rh);
760 * Copy any already quiesced regions.
762 while ((reg = rh_recovery_start(&ms->rh))) {
763 r = recover(ms, reg);
765 rh_recovery_end(reg, 0);
769 * Update the in sync flag if necessary.
770 * Raise an event when the mirror becomes in-sync.
772 * After recovery completes, the mirror becomes in_sync.
773 * Only an I/O failure can then take it back out-of-sync.
775 if (log->type->get_sync_count(log) == ms->nr_regions) {
777 dm_table_event(ms->ti->table);
780 } else if (ms->in_sync)
784 /*-----------------------------------------------------------------
786 *---------------------------------------------------------------*/
787 /* Switch to next dev, via round-robin, after MIN_READS reads */
788 #define MIN_READS 128
791 * @ms: the mirror set
793 * This function is used for read balancing.
795 * Returns: chosen mirror, or NULL on failure
797 static struct mirror *choose_mirror(struct mirror_set *ms)
799 struct mirror *start_mirror = ms->read_mirror;
802 * Perform MIN_READS on each working mirror then
803 * advance to the next one. start_mirror stores
804 * the first we tried, so we know when we're done.
807 if (likely(!atomic_read(&ms->read_mirror->error_count)) &&
808 !atomic_dec_and_test(&ms->read_count))
811 atomic_set(&ms->read_count, MIN_READS);
813 if (ms->read_mirror-- == ms->mirror)
814 ms->read_mirror += ms->nr_mirrors;
815 } while (ms->read_mirror != start_mirror);
818 * We've rejected every mirror.
819 * Confirm the start_mirror can be used.
821 if (unlikely(atomic_read(&ms->read_mirror->error_count)))
825 return ms->read_mirror;
829 * @m: mirror device to fail
831 * If the device is valid, mark it invalid. Also,
832 * if this is the default mirror device (i.e. the primary
833 * device) and the mirror set is in-sync, choose an
834 * alternate primary device.
836 * This function cannot block.
838 static void fail_mirror(struct mirror *m)
840 struct mirror_set *ms = m->ms;
843 atomic_inc(&m->error_count);
845 if (atomic_read(&m->error_count) > 1)
848 if (m != ms->default_mirror)
852 * If the default mirror fails, change it.
853 * In the case of cluster mirroring, the default
854 * is changed in rh_update_states.
858 * Can not switch primary. Better to issue requests
859 * to same failing device than to risk returning
862 DMERR("Primary mirror device has failed while mirror is not in-sync");
863 DMERR("Unable to choose alternative primary device");
867 for (new = ms->mirror; new < ms->mirror + ms->nr_mirrors; new++)
868 if (!atomic_read(&new->error_count)) {
869 ms->default_mirror = new;
873 if (unlikely(new == ms->mirror + ms->nr_mirrors))
874 DMWARN("All sides of mirror have failed.");
877 static int default_ok(struct mirror *m)
879 return !atomic_read(&m->ms->default_mirror->error_count);
882 static int mirror_available(struct mirror_set *ms, struct bio *bio)
884 region_t region = bio_to_region(&ms->rh, bio);
886 if (ms->rh.log->type->in_sync(ms->rh.log, region, 0) > 0)
887 return choose_mirror(ms) ? 1 : 0;
893 * remap a buffer to a particular mirror.
895 static sector_t map_sector(struct mirror *m, struct bio *bio)
897 return m->offset + (bio->bi_sector - m->ms->ti->begin);
900 static void map_bio(struct mirror *m, struct bio *bio)
902 bio->bi_bdev = m->dev->bdev;
903 bio->bi_sector = map_sector(m, bio);
906 static void map_region(struct io_region *io, struct mirror *m,
909 io->bdev = m->dev->bdev;
910 io->sector = map_sector(m, bio);
911 io->count = bio->bi_size >> 9;
914 /*-----------------------------------------------------------------
916 *---------------------------------------------------------------*/
917 static void read_callback(unsigned long error, void *context)
919 struct bio *bio = (struct bio *)context;
923 bio_set_m(bio, NULL);
925 if (unlikely(error)) {
926 DMWARN("A read failure occurred on a mirror device.");
928 if (likely(default_ok(m)) || mirror_available(m->ms, bio)) {
929 DMWARN("Trying different device.");
930 queue_bio(m->ms, bio, bio_rw(bio));
932 DMERR("No other device available, failing I/O.");
933 bio_endio(bio, bio->bi_size, -EIO);
936 bio_endio(bio, bio->bi_size, 0);
939 /* Asynchronous read. */
940 static void read_async_bio(struct mirror *m, struct bio *bio)
944 map_region(&io, m, bio);
946 dm_io_async_bvec(1, &io, READ,
947 bio->bi_io_vec + bio->bi_idx,
951 static void do_reads(struct mirror_set *ms, struct bio_list *reads)
956 while ((bio = bio_list_pop(reads))) {
958 * We can only read balance if the region is in sync.
960 if (likely(rh_in_sync(&ms->rh,
961 bio_to_region(&ms->rh, bio), 0)))
962 m = choose_mirror(ms);
964 m = ms->default_mirror;
966 /* If default has failed, we give up. */
967 if (unlikely(m && atomic_read(&m->error_count)))
972 read_async_bio(m, bio);
974 bio_endio(bio, bio->bi_size, -EIO);
978 /*-----------------------------------------------------------------
981 * We do different things with the write io depending on the
982 * state of the region that it's in:
984 * SYNC: increment pending, use kcopyd to write to *all* mirrors
985 * RECOVERING: delay the io until recovery completes
986 * NOSYNC: increment pending, just write to the default mirror
987 *---------------------------------------------------------------*/
995 * The bio was written on some mirror(s) but failed on other mirror(s).
996 * We can successfully endio the bio but should avoid the region being
997 * marked clean by setting the state RH_NOSYNC.
999 * This function is _not_ interrupt safe!
1001 static void __bio_mark_nosync(struct mirror_set *ms,
1002 struct bio *bio, unsigned int done, int error)
1004 unsigned long flags;
1005 struct region_hash *rh = &ms->rh;
1006 struct dirty_log *log = ms->rh.log;
1008 region_t region = bio_to_region(rh, bio);
1013 /* We must inform the log that the sync count has changed. */
1014 log->type->set_region_sync(log, region, 0);
1016 read_lock(&rh->hash_lock);
1017 reg = __rh_find(rh, region);
1018 read_unlock(&rh->hash_lock);
1020 /* region hash entry should exist because write was in-flight */
1022 BUG_ON(!list_empty(®->list));
1024 spin_lock_irqsave(&rh->region_lock, flags);
1028 * 2) RH_NOSYNC: was dirty, other preceeding writes failed
1029 * 3) RH_RECOVERING: flushing pending writes
1030 * Either case, the region should have not been connected to list.
1032 recovering = (reg->state == RH_RECOVERING);
1033 reg->state = RH_NOSYNC;
1034 BUG_ON(!list_empty(®->list));
1035 spin_unlock_irqrestore(&rh->region_lock, flags);
1037 bio_endio(bio, done, error);
1039 complete_resync_work(reg, 0);
1042 static void write_callback(unsigned long error, void *context, int log_failure)
1044 unsigned int i, ret = 0;
1045 struct bio *bio = (struct bio *) context;
1046 struct mirror_set *ms;
1048 int should_wake = 0;
1050 ms = (bio_get_m(bio))->ms;
1051 bio_set_m(bio, NULL);
1054 * NOTE: We don't decrement the pending count here,
1055 * instead it is done by the targets endio function.
1056 * This way we handle both writes to SYNC and NOSYNC
1057 * regions with the same code.
1059 if (unlikely(error)) {
1060 DMERR("Error during write occurred.");
1063 * If the log is intact, we can play around with trying
1064 * to handle the failure. Otherwise, we have to report
1065 * the I/O as failed.
1068 for (i = 0; i < ms->nr_mirrors; i++) {
1069 if (test_bit(i, &error))
1070 fail_mirror(ms->mirror + i);
1076 if (likely(uptodate)) {
1078 * Need to raise event. Since raising
1079 * events can block, we need to do it in
1082 spin_lock(&ms->lock);
1083 if (!ms->failures.head)
1085 bio_list_add(&ms->failures, bio);
1086 spin_unlock(&ms->lock);
1091 DMERR("All replicated volumes dead, failing I/O");
1092 /* None of the writes succeeded, fail the I/O. */
1097 bio_endio(bio, bio->bi_size, ret);
1100 static void write_callback_good_log(unsigned long error, void *context)
1102 write_callback(error, context, 0);
1105 static void write_callback_bad_log(unsigned long error, void *context)
1107 write_callback(error, context, 1);
1110 static void do_write(struct mirror_set *ms, struct bio *bio, int log_failure)
1113 struct io_region io[ms->nr_mirrors], *dest = io;
1116 if (log_failure && dm_mirror_error_on_log_failure) {
1117 bio_endio(bio, bio->bi_size, -EIO);
1121 for (i = 0, m = ms->mirror; i < ms->nr_mirrors; i++, m++)
1122 map_region(dest++, m, bio);
1125 * We can use the default mirror here, because we
1126 * only need it in order to retrieve the reference
1127 * to the mirror set in write_callback().
1129 bio_set_m(bio, ms->default_mirror);
1131 dm_io_async_bvec(ms->nr_mirrors, io, WRITE,
1132 bio->bi_io_vec + bio->bi_idx,
1133 write_callback_bad_log, bio);
1135 dm_io_async_bvec(ms->nr_mirrors, io, WRITE,
1136 bio->bi_io_vec + bio->bi_idx,
1137 write_callback_good_log, bio);
1140 static void do_writes(struct mirror_set *ms, struct bio_list *writes)
1144 struct bio_list sync, nosync, recover, *this_list = NULL;
1145 struct bio_list requeue;
1146 struct dirty_log *log = ms->rh.log;
1153 * Classify each write.
1155 bio_list_init(&sync);
1156 bio_list_init(&nosync);
1157 bio_list_init(&recover);
1158 bio_list_init(&requeue);
1160 while ((bio = bio_list_pop(writes))) {
1161 region = bio_to_region(&ms->rh, bio);
1163 if (log->type->is_remote_recovering &&
1164 log->type->is_remote_recovering(log, region)) {
1165 bio_list_add(&requeue, bio);
1169 state = rh_state(&ms->rh, region, 1);
1177 this_list = &nosync;
1181 this_list = &recover;
1185 bio_list_add(this_list, bio);
1189 * Add bios that are delayed due to remote recovery
1190 * back on to the write queue
1192 spin_lock_irq(&ms->lock);
1193 bio_list_merge(&ms->writes, &requeue);
1194 spin_unlock_irq(&ms->lock);
1197 * Increment the pending counts for any regions that will
1198 * be written to (writes to recover regions are going to
1201 rh_inc_pending(&ms->rh, &sync);
1202 rh_inc_pending(&ms->rh, &nosync);
1204 r = rh_flush(&ms->rh);
1209 while ((bio = bio_list_pop(&sync)))
1210 do_write(ms, bio, r ? 1 : 0);
1212 while ((bio = bio_list_pop(&recover)))
1213 rh_delay(&ms->rh, bio);
1215 while ((bio = bio_list_pop(&nosync))) {
1216 map_bio(ms->default_mirror, bio);
1217 generic_make_request(bio);
1221 static void do_failures(struct mirror_set *ms, struct bio_list *failures)
1224 struct dirty_log *log = ms->rh.log;
1226 if (!failures->head)
1229 if (log->type->get_failure_response(log) == DMLOG_IOERR_BLOCK)
1230 dm_table_event(ms->ti->table);
1232 while ((bio = bio_list_pop(failures)))
1233 __bio_mark_nosync(ms, bio, bio->bi_size, 0);
1236 /*-----------------------------------------------------------------
1238 *---------------------------------------------------------------*/
1239 static LIST_HEAD(_mirror_sets);
1240 static DECLARE_RWSEM(_mirror_sets_lock);
1242 static int do_mirror(struct mirror_set *ms)
1244 struct bio_list reads, writes, failures;
1246 spin_lock_irq(&ms->lock);
1248 writes = ms->writes;
1249 failures = ms->failures;
1250 bio_list_init(&ms->reads);
1251 bio_list_init(&ms->writes);
1252 bio_list_init(&ms->failures);
1253 spin_unlock_irq(&ms->lock);
1255 rh_update_states(&ms->rh);
1257 do_reads(ms, &reads);
1258 do_writes(ms, &writes);
1259 do_failures(ms, &failures);
1261 return (ms->writes.head) ? 1 : 0;
1264 static int _do_work(void)
1267 struct mirror_set *ms;
1269 down_read(&_mirror_sets_lock);
1270 list_for_each_entry (ms, &_mirror_sets, list)
1271 more_work += do_mirror(ms);
1272 up_read(&_mirror_sets_lock);
1277 static void do_work(void *ignored)
1279 while (_do_work()) {
1280 set_current_state(TASK_INTERRUPTIBLE);
1281 schedule_timeout(HZ/5);
1285 /*-----------------------------------------------------------------
1287 *---------------------------------------------------------------*/
1288 static struct mirror_set *alloc_context(unsigned int nr_mirrors,
1289 uint32_t region_size,
1290 struct dm_target *ti,
1291 struct dirty_log *dl)
1294 struct mirror_set *ms = NULL;
1296 if (array_too_big(sizeof(*ms), sizeof(ms->mirror[0]), nr_mirrors))
1299 len = sizeof(*ms) + (sizeof(ms->mirror[0]) * nr_mirrors);
1301 ms = kmalloc(len, GFP_KERNEL);
1303 ti->error = "Cannot allocate mirror context";
1308 spin_lock_init(&ms->lock);
1311 ms->nr_mirrors = nr_mirrors;
1312 ms->nr_regions = dm_sector_div_up(ti->len, region_size);
1314 ms->read_mirror = &ms->mirror[DEFAULT_MIRROR];
1315 ms->default_mirror = &ms->mirror[DEFAULT_MIRROR];
1317 if (rh_init(&ms->rh, ms, dl, region_size, ms->nr_regions)) {
1318 ti->error = "Error creating dirty region hash";
1323 atomic_set(&ms->read_count, MIN_READS);
1325 bio_list_init(&ms->failures);
1330 static void free_context(struct mirror_set *ms, struct dm_target *ti,
1334 dm_put_device(ti, ms->mirror[m].dev);
1340 static inline int _check_region_size(struct dm_target *ti, uint32_t size)
1342 return !(size % (PAGE_SIZE >> 9) || (size & (size - 1)) ||
1346 static int get_mirror(struct mirror_set *ms, struct dm_target *ti,
1347 unsigned int mirror, char **argv)
1349 unsigned long long offset;
1351 if (sscanf(argv[1], "%llu", &offset) != 1) {
1352 ti->error = "Invalid offset";
1356 if (dm_get_device(ti, argv[0], offset, ti->len,
1357 dm_table_get_mode(ti->table),
1358 &ms->mirror[mirror].dev)) {
1359 ti->error = "Device lookup failure";
1363 ms->mirror[mirror].offset = offset;
1364 atomic_set(&(ms->mirror[mirror].error_count), 0);
1365 ms->mirror[mirror].ms = ms;
1370 static int add_mirror_set(struct mirror_set *ms)
1372 down_write(&_mirror_sets_lock);
1373 list_add_tail(&ms->list, &_mirror_sets);
1374 up_write(&_mirror_sets_lock);
1380 static void del_mirror_set(struct mirror_set *ms)
1382 down_write(&_mirror_sets_lock);
1383 list_del(&ms->list);
1384 up_write(&_mirror_sets_lock);
1388 * Create dirty log: log_type #log_params <log_params>
1390 static struct dirty_log *create_dirty_log(struct dm_target *ti,
1391 unsigned int argc, char **argv,
1392 unsigned int *args_used)
1394 unsigned int param_count;
1395 struct dirty_log *dl;
1398 ti->error = "Insufficient mirror log arguments";
1402 if (sscanf(argv[1], "%u", ¶m_count) != 1) {
1403 ti->error = "Invalid mirror log argument count";
1407 *args_used = 2 + param_count;
1409 if (argc < *args_used) {
1410 ti->error = "Insufficient mirror log arguments";
1414 dl = dm_create_dirty_log(argv[0], ti, param_count, argv + 2);
1416 ti->error = "Error creating mirror dirty log";
1420 if (!_check_region_size(ti, dl->type->get_region_size(dl))) {
1421 ti->error = "Invalid region size";
1422 dm_destroy_dirty_log(dl);
1430 * Construct a mirror mapping:
1432 * log_type #log_params <log_params>
1433 * #mirrors [mirror_path offset]{2,}
1435 * log_type is "core" or "disk"
1436 * #log_params is between 1 and 3
1438 #define DM_IO_PAGES 64
1439 static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
1442 unsigned int nr_mirrors, m, args_used;
1443 struct mirror_set *ms;
1444 struct dirty_log *dl;
1446 dl = create_dirty_log(ti, argc, argv, &args_used);
1453 if (!argc || sscanf(argv[0], "%u", &nr_mirrors) != 1 ||
1454 nr_mirrors < 2 || nr_mirrors > KCOPYD_MAX_REGIONS + 1) {
1455 ti->error = "Invalid number of mirrors";
1456 dm_destroy_dirty_log(dl);
1462 if (argc != nr_mirrors * 2) {
1463 ti->error = "Wrong number of mirror arguments";
1464 dm_destroy_dirty_log(dl);
1468 ms = alloc_context(nr_mirrors, dl->type->get_region_size(dl), ti, dl);
1470 dm_destroy_dirty_log(dl);
1474 /* Get the mirror parameter sets */
1475 for (m = 0; m < nr_mirrors; m++) {
1476 r = get_mirror(ms, ti, m, argv);
1478 free_context(ms, ti, m);
1486 ti->split_io = ms->rh.region_size;
1488 r = kcopyd_client_create(DM_IO_PAGES, &ms->kcopyd_client);
1490 free_context(ms, ti, ms->nr_mirrors);
1498 static void mirror_dtr(struct dm_target *ti)
1500 struct mirror_set *ms = (struct mirror_set *) ti->private;
1503 kcopyd_client_destroy(ms->kcopyd_client);
1504 free_context(ms, ti, ms->nr_mirrors);
1507 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw)
1509 unsigned long flags;
1510 int should_wake = 0;
1511 struct bio_list *bl;
1513 bl = (rw == WRITE) ? &ms->writes : &ms->reads;
1514 spin_lock_irqsave(&ms->lock, flags);
1515 should_wake = !(bl->head);
1516 bio_list_add(bl, bio);
1517 spin_unlock_irqrestore(&ms->lock, flags);
1524 * Mirror mapping function
1526 static int mirror_map(struct dm_target *ti, struct bio *bio,
1527 union map_info *map_context)
1529 int r, rw = bio_rw(bio);
1531 struct mirror_set *ms = ti->private;
1532 struct bio_map_info *bmi = NULL;
1533 struct dm_bio_details *bd = NULL;
1536 /* Save region for mirror_end_io() handler */
1537 map_context->ll = bio_to_region(&ms->rh, bio);
1538 queue_bio(ms, bio, rw);
1542 /* All about the reads now */
1544 r = ms->rh.log->type->in_sync(ms->rh.log,
1545 bio_to_region(&ms->rh, bio), 0);
1546 if (r < 0 && r != -EWOULDBLOCK)
1549 if (r == -EWOULDBLOCK)
1554 * Optimize reads by avoiding to hand them to daemon.
1556 * In case they fail, queue them for another shot
1557 * in the mirror_end_io() function.
1559 m = choose_mirror(ms);
1561 bmi = mempool_alloc(bio_map_info_pool, GFP_NOIO);
1564 /* without this, a read is not retryable */
1566 dm_bio_record(bd, bio);
1567 map_context->ptr = bmi;
1570 /* we could fail now, but we can at least **
1571 ** give it a shot. The bd is only used to **
1572 ** retry in the event of a failure anyway. **
1573 ** If we fail, we can fail the I/O then. */
1574 map_context->ptr = NULL;
1578 return 1; /* Mapped -> queue request. */
1582 /* Either not clean, or -EWOULDBLOCK */
1584 return -EWOULDBLOCK;
1586 queue_bio(ms, bio, rw);
1592 static int mirror_end_io(struct dm_target *ti, struct bio *bio,
1593 int error, union map_info *map_context)
1595 int rw = bio_rw(bio);
1596 struct mirror_set *ms = (struct mirror_set *) ti->private;
1597 struct mirror *m = NULL;
1598 struct dm_bio_details *bd = NULL;
1601 * We need to dec pending if this was a write.
1604 rh_dec(&ms->rh, map_context->ll);
1608 if (error == -EOPNOTSUPP)
1611 if ((error == -EWOULDBLOCK) && bio_rw_ahead(bio))
1614 if (unlikely(error)) {
1615 DMERR("A read failure occurred on a mirror device.");
1616 if (!map_context->ptr) {
1618 * There wasn't enough memory to record necessary
1619 * information for a retry or there was no other
1622 DMERR("Unable to retry read.");
1625 m = ((struct bio_map_info *)map_context->ptr)->bmi_m;
1626 fail_mirror(m); /* Flag error on mirror. */
1629 * A failed read needs to get queued
1630 * to the daemon for another shot to
1631 * one (if any) intact mirrors.
1633 if (default_ok(m) || mirror_available(ms, bio)) {
1634 bd = &(((struct bio_map_info *)map_context->ptr)->bmi_bd
1637 DMWARN("Trying different device.");
1638 dm_bio_restore(bd, bio);
1639 mempool_free(map_context->ptr, bio_map_info_pool);
1640 map_context->ptr = NULL;
1641 queue_bio(ms, bio, rw);
1642 return 1; /* We want another shot on the bio. */
1644 DMERR("All replicated volumes dead, failing I/O");
1648 if (map_context->ptr)
1649 mempool_free(map_context->ptr, bio_map_info_pool);
1654 static void mirror_presuspend(struct dm_target *ti)
1656 struct mirror_set *ms = (struct mirror_set *) ti->private;
1657 struct dirty_log *log = ms->rh.log;
1659 if (log->type->presuspend && log->type->presuspend(log))
1660 /* FIXME: need better error handling */
1661 DMWARN("log presuspend failed");
1664 static void mirror_postsuspend(struct dm_target *ti)
1666 struct mirror_set *ms = (struct mirror_set *) ti->private;
1667 struct dirty_log *log = ms->rh.log;
1669 rh_stop_recovery(&ms->rh);
1671 /* Wait for all I/O we generated to complete */
1672 wait_event(recovery_stopped_event,
1673 !atomic_read(&ms->rh.recovery_in_flight));
1675 if (log->type->postsuspend && log->type->postsuspend(log))
1676 /* FIXME: need better error handling */
1677 DMWARN("log postsuspend failed");
1680 static void mirror_resume(struct dm_target *ti)
1682 struct mirror_set *ms = (struct mirror_set *) ti->private;
1683 struct dirty_log *log = ms->rh.log;
1684 if (log->type->resume && log->type->resume(log))
1685 /* FIXME: need better error handling */
1686 DMWARN("log resume failed");
1687 rh_start_recovery(&ms->rh);
1690 static int mirror_status(struct dm_target *ti, status_type_t type,
1691 char *result, unsigned int maxlen)
1693 unsigned int m, sz = 0;
1694 struct mirror_set *ms = (struct mirror_set *) ti->private;
1695 char buffer[ms->nr_mirrors + 1];
1698 case STATUSTYPE_INFO:
1699 DMEMIT("%d ", ms->nr_mirrors);
1700 for (m = 0; m < ms->nr_mirrors; m++) {
1701 DMEMIT("%s ", ms->mirror[m].dev->name);
1702 buffer[m] = atomic_read(&(ms->mirror[m].error_count)) ?
1707 DMEMIT("%llu/%llu 1 %s ",
1708 ms->rh.log->type->get_sync_count(ms->rh.log),
1709 ms->nr_regions, buffer);
1710 ms->rh.log->type->status(ms->rh.log, type, result+sz, maxlen-sz);
1713 case STATUSTYPE_TABLE:
1714 sz = ms->rh.log->type->status(ms->rh.log, type, result, maxlen);
1715 DMEMIT("%d ", ms->nr_mirrors);
1716 for (m = 0; m < ms->nr_mirrors; m++)
1717 DMEMIT("%s %llu ", ms->mirror[m].dev->name,
1718 (unsigned long long)ms->mirror[m].offset);
1724 static struct target_type mirror_target = {
1726 .version = {1, 2, 0},
1727 .module = THIS_MODULE,
1731 .end_io = mirror_end_io,
1732 .presuspend = mirror_presuspend,
1733 .postsuspend = mirror_postsuspend,
1734 .resume = mirror_resume,
1735 .status = mirror_status,
1738 static int __init dm_mirror_init(void)
1742 bio_map_info_pool = mempool_create(100, bio_map_info_alloc,
1743 bio_map_info_free, NULL);
1744 if (!bio_map_info_pool)
1747 r = dm_dirty_log_init();
1751 _kmirrord_wq = create_singlethread_workqueue("kmirrord");
1752 if (!_kmirrord_wq) {
1753 DMERR("couldn't start kmirrord");
1754 dm_dirty_log_exit();
1757 INIT_WORK(&_kmirrord_work, do_work, NULL);
1759 r = dm_register_target(&mirror_target);
1761 DMERR("%s: Failed to register mirror target",
1762 mirror_target.name);
1763 dm_dirty_log_exit();
1764 destroy_workqueue(_kmirrord_wq);
1765 } else if (!dm_mirror_error_on_log_failure) {
1766 DMWARN("Warning: dm_mirror_error_on_log_failure = 0");
1767 DMWARN("In this mode, the following fault sequence could cause corruption:");
1768 DMWARN(" 1) Log device failure");
1769 DMWARN(" 2) Write I/O issued");
1770 DMWARN(" 3) Machine failure");
1771 DMWARN(" 4) Log device restored");
1772 DMWARN(" 5) Machine reboots");
1773 DMWARN("If this happens, you must resync your mirror.");
1779 static void __exit dm_mirror_exit(void)
1783 r = dm_unregister_target(&mirror_target);
1785 DMERR("%s: unregister failed %d", mirror_target.name, r);
1787 destroy_workqueue(_kmirrord_wq);
1788 dm_dirty_log_exit();
1792 module_init(dm_mirror_init);
1793 module_exit(dm_mirror_exit);
1795 module_param(dm_mirror_error_on_log_failure, int, 1);
1796 MODULE_PARM_DESC(dm_mirror_error_on_log_failure, "Set to '0' if you want writes to succeed on log device failure");
1797 MODULE_DESCRIPTION(DM_NAME " mirror target");
1798 MODULE_AUTHOR("Joe Thornber");
1799 MODULE_LICENSE("GPL");