Merge to Fedora kernel-2.6.18-1.2224_FC5 patched with stable patch-2.6.18.1-vs2.0...
[linux-2.6.git] / drivers / md / dm-raid1.c
1 /*
2  * Copyright (C) 2003 Sistina Software Limited.
3  *
4  * This file is released under the GPL.
5  */
6
7 #include "dm.h"
8 #include "dm-bio-list.h"
9 #include "dm-bio-record.h"
10 #include "dm-io.h"
11 #include "dm-log.h"
12 #include "kcopyd.h"
13
14 #include <linux/ctype.h>
15 #include <linux/init.h>
16 #include <linux/mempool.h>
17 #include <linux/module.h>
18 #include <linux/pagemap.h>
19 #include <linux/slab.h>
20 #include <linux/time.h>
21 #include <linux/vmalloc.h>
22 #include <linux/workqueue.h>
23
24 #define DM_MSG_PREFIX "raid1"
25
26 static struct workqueue_struct *_kmirrord_wq;
27 static struct work_struct _kmirrord_work;
28 DECLARE_WAIT_QUEUE_HEAD(recovery_stopped_event);
29
30 static int dm_mirror_error_on_log_failure = 1;
31
32 static inline void wake(void)
33 {
34         queue_work(_kmirrord_wq, &_kmirrord_work);
35 }
36
37 /*-----------------------------------------------------------------
38  * Region hash
39  *
40  * The mirror splits itself up into discrete regions.  Each
41  * region can be in one of three states: clean, dirty,
42  * nosync.  There is no need to put clean regions in the hash.
43  *
44  * In addition to being present in the hash table a region _may_
45  * be present on one of three lists.
46  *
47  *   clean_regions: Regions on this list have no io pending to
48  *   them, they are in sync, we are no longer interested in them,
49  *   they are dull.  rh_update_states() will remove them from the
50  *   hash table.
51  *
52  *   quiesced_regions: These regions have been spun down, ready
53  *   for recovery.  rh_recovery_start() will remove regions from
54  *   this list and hand them to kmirrord, which will schedule the
55  *   recovery io with kcopyd.
56  *
57  *   recovered_regions: Regions that kcopyd has successfully
58  *   recovered.  rh_update_states() will now schedule any delayed
59  *   io, up the recovery_count, and remove the region from the
60  *   hash.
61  *
62  * There are 2 locks:
63  *   A rw spin lock 'hash_lock' protects just the hash table,
64  *   this is never held in write mode from interrupt context,
65  *   which I believe means that we only have to disable irqs when
66  *   doing a write lock.
67  *
68  *   An ordinary spin lock 'region_lock' that protects the three
69  *   lists in the region_hash, with the 'state', 'list' and
70  *   'bhs_delayed' fields of the regions.  This is used from irq
71  *   context, so all other uses will have to suspend local irqs.
72  *---------------------------------------------------------------*/
73 struct mirror_set;
74 struct region_hash {
75         struct mirror_set *ms;
76         uint32_t region_size;
77         unsigned region_shift;
78
79         /* holds persistent region state */
80         struct dirty_log *log;
81
82         /* hash table */
83         rwlock_t hash_lock;
84         mempool_t *region_pool;
85         unsigned int mask;
86         unsigned int nr_buckets;
87         struct list_head *buckets;
88
89         spinlock_t region_lock;
90         atomic_t recovery_in_flight;
91         struct semaphore recovery_count;
92         struct list_head clean_regions;
93         struct list_head quiesced_regions;
94         struct list_head recovered_regions;
95         struct list_head failed_recovered_regions;
96 };
97
98 enum {
99         RH_CLEAN,
100         RH_DIRTY,
101         RH_NOSYNC,
102         RH_RECOVERING
103 };
104
105 struct region {
106         struct region_hash *rh; /* FIXME: can we get rid of this ? */
107         region_t key;
108         int state;
109
110         struct list_head hash_list;
111         struct list_head list;
112
113         atomic_t pending;
114         struct bio_list delayed_bios;
115 };
116
117
118 /*-----------------------------------------------------------------
119  * Mirror set structures.
120  *---------------------------------------------------------------*/
121 struct mirror {
122         atomic_t error_count;  /* Error counter to flag mirror failure */
123         struct mirror_set *ms;
124         struct dm_dev *dev;
125         sector_t offset;
126 };
127
128 struct mirror_set {
129         struct dm_target *ti;
130         struct list_head list;
131         struct region_hash rh;
132         struct kcopyd_client *kcopyd_client;
133
134         spinlock_t lock;        /* protects the lists */
135         struct bio_list reads;
136         struct bio_list writes;
137         struct bio_list failures;
138
139         /* recovery */
140         region_t nr_regions;
141         int in_sync;
142
143         struct mirror *default_mirror;  /* Default mirror */
144
145         unsigned int nr_mirrors;
146         atomic_t read_count;      /* Read counter for read balancing */
147         struct mirror *read_mirror; /* Last mirror read. */
148         struct mirror mirror[0];
149 };
150
151 /*
152  * Conversion fns
153  */
154 static inline region_t bio_to_region(struct region_hash *rh, struct bio *bio)
155 {
156         return (bio->bi_sector - rh->ms->ti->begin) >> rh->region_shift;
157 }
158
159 static inline sector_t region_to_sector(struct region_hash *rh, region_t region)
160 {
161         return region << rh->region_shift;
162 }
163
164 /* FIXME move this */
165 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw);
166
167 #define MIN_REGIONS 64
168 #define MAX_RECOVERY 1
169 static int rh_init(struct region_hash *rh, struct mirror_set *ms,
170                    struct dirty_log *log, uint32_t region_size,
171                    region_t nr_regions)
172 {
173         unsigned int nr_buckets, max_buckets;
174         size_t i;
175
176         /*
177          * Calculate a suitable number of buckets for our hash
178          * table.
179          */
180         max_buckets = nr_regions >> 6;
181         for (nr_buckets = 128u; nr_buckets < max_buckets; nr_buckets <<= 1)
182                 ;
183         nr_buckets >>= 1;
184
185         rh->ms = ms;
186         rh->log = log;
187         rh->region_size = region_size;
188         rh->region_shift = ffs(region_size) - 1;
189         rwlock_init(&rh->hash_lock);
190         rh->mask = nr_buckets - 1;
191         rh->nr_buckets = nr_buckets;
192
193         rh->buckets = vmalloc(nr_buckets * sizeof(*rh->buckets));
194         if (!rh->buckets) {
195                 DMERR("unable to allocate region hash memory");
196                 return -ENOMEM;
197         }
198
199         for (i = 0; i < nr_buckets; i++)
200                 INIT_LIST_HEAD(rh->buckets + i);
201
202         spin_lock_init(&rh->region_lock);
203         sema_init(&rh->recovery_count, 0);
204         atomic_set(&rh->recovery_in_flight, 0);
205         INIT_LIST_HEAD(&rh->clean_regions);
206         INIT_LIST_HEAD(&rh->quiesced_regions);
207         INIT_LIST_HEAD(&rh->recovered_regions);
208         INIT_LIST_HEAD(&rh->failed_recovered_regions);
209
210         rh->region_pool = mempool_create_kmalloc_pool(MIN_REGIONS,
211                                                       sizeof(struct region));
212         if (!rh->region_pool) {
213                 vfree(rh->buckets);
214                 rh->buckets = NULL;
215                 return -ENOMEM;
216         }
217
218         return 0;
219 }
220
221 static void rh_exit(struct region_hash *rh)
222 {
223         unsigned int h;
224         struct region *reg, *nreg;
225
226         BUG_ON(!list_empty(&rh->quiesced_regions));
227         for (h = 0; h < rh->nr_buckets; h++) {
228                 list_for_each_entry_safe(reg, nreg, rh->buckets + h, hash_list) {
229                         BUG_ON(atomic_read(&reg->pending));
230                         mempool_free(reg, rh->region_pool);
231                 }
232         }
233
234         if (rh->log)
235                 dm_destroy_dirty_log(rh->log);
236         if (rh->region_pool)
237                 mempool_destroy(rh->region_pool);
238         vfree(rh->buckets);
239 }
240
241 #define RH_HASH_MULT 2654435387U
242
243 static inline unsigned int rh_hash(struct region_hash *rh, region_t region)
244 {
245         return (unsigned int) ((region * RH_HASH_MULT) >> 12) & rh->mask;
246 }
247
248 static struct region *__rh_lookup(struct region_hash *rh, region_t region)
249 {
250         struct region *reg;
251
252         list_for_each_entry (reg, rh->buckets + rh_hash(rh, region), hash_list)
253                 if (reg->key == region)
254                         return reg;
255
256         return NULL;
257 }
258
259 static void __rh_insert(struct region_hash *rh, struct region *reg)
260 {
261         unsigned int h = rh_hash(rh, reg->key);
262         list_add(&reg->hash_list, rh->buckets + h);
263 }
264
265 static struct region *__rh_alloc(struct region_hash *rh, region_t region)
266 {
267         struct region *reg, *nreg;
268
269         read_unlock(&rh->hash_lock);
270         nreg = mempool_alloc(rh->region_pool, GFP_ATOMIC);
271         if (unlikely(!nreg))
272                 nreg = kmalloc(sizeof(struct region), GFP_NOIO);
273         nreg->state = rh->log->type->in_sync(rh->log, region, 1) ?
274                 RH_CLEAN : RH_NOSYNC;
275         nreg->rh = rh;
276         nreg->key = region;
277
278         INIT_LIST_HEAD(&nreg->list);
279
280         atomic_set(&nreg->pending, 0);
281         bio_list_init(&nreg->delayed_bios);
282         write_lock_irq(&rh->hash_lock);
283
284         reg = __rh_lookup(rh, region);
285         if (reg)
286                 /* we lost the race */
287                 mempool_free(nreg, rh->region_pool);
288
289         else {
290                 __rh_insert(rh, nreg);
291                 if (nreg->state == RH_CLEAN) {
292                         spin_lock(&rh->region_lock);
293                         list_add(&nreg->list, &rh->clean_regions);
294                         spin_unlock(&rh->region_lock);
295                 }
296                 reg = nreg;
297         }
298         write_unlock_irq(&rh->hash_lock);
299         read_lock(&rh->hash_lock);
300
301         return reg;
302 }
303
304 static inline struct region *__rh_find(struct region_hash *rh, region_t region)
305 {
306         struct region *reg;
307
308         reg = __rh_lookup(rh, region);
309         if (!reg)
310                 reg = __rh_alloc(rh, region);
311
312         return reg;
313 }
314
315 static int rh_state(struct region_hash *rh, region_t region, int may_block)
316 {
317         int r;
318         struct region *reg;
319
320         read_lock(&rh->hash_lock);
321         reg = __rh_lookup(rh, region);
322         read_unlock(&rh->hash_lock);
323
324         if (reg)
325                 return reg->state;
326
327         /*
328          * The region wasn't in the hash, so we fall back to the
329          * dirty log.
330          */
331         r = rh->log->type->in_sync(rh->log, region, may_block);
332
333         /*
334          * Any error from the dirty log (eg. -EWOULDBLOCK) gets
335          * taken as a RH_NOSYNC
336          */
337         return r == 1 ? RH_CLEAN : RH_NOSYNC;
338 }
339
340 static inline int rh_in_sync(struct region_hash *rh,
341                              region_t region, int may_block)
342 {
343         int state = rh_state(rh, region, may_block);
344         return state == RH_CLEAN || state == RH_DIRTY;
345 }
346
347 static void dispatch_bios(struct mirror_set *ms, struct bio_list *bio_list)
348 {
349         struct bio *bio;
350
351         while ((bio = bio_list_pop(bio_list))) {
352                 queue_bio(ms, bio, WRITE);
353         }
354 }
355
356 static void complete_resync_work(struct region *reg, int success)
357 {
358         struct region_hash *rh = reg->rh;
359
360         rh->log->type->set_region_sync(rh->log, reg->key, success);
361         if (atomic_dec_and_test(&rh->recovery_in_flight))
362                 wake_up_all(&recovery_stopped_event);
363         dispatch_bios(rh->ms, &reg->delayed_bios);
364         up(&rh->recovery_count);
365 }
366
367 static void rh_update_states(struct region_hash *rh)
368 {
369         struct region *reg, *next;
370
371         LIST_HEAD(clean);
372         LIST_HEAD(recovered);
373         LIST_HEAD(failed_recovered);
374
375         /*
376          * Quickly grab the lists.
377          */
378         write_lock_irq(&rh->hash_lock);
379         spin_lock(&rh->region_lock);
380         if (!list_empty(&rh->clean_regions)) {
381                 list_splice(&rh->clean_regions, &clean);
382                 INIT_LIST_HEAD(&rh->clean_regions);
383
384                 list_for_each_entry (reg, &clean, list) {
385                         rh->log->type->clear_region(rh->log, reg->key);
386                         list_del(&reg->hash_list);
387                 }
388         }
389
390         if (!list_empty(&rh->recovered_regions)) {
391                 list_splice(&rh->recovered_regions, &recovered);
392                 INIT_LIST_HEAD(&rh->recovered_regions);
393
394                 list_for_each_entry (reg, &recovered, list)
395                         list_del(&reg->hash_list);
396         }
397
398         if (!list_empty(&rh->failed_recovered_regions)) {
399                 list_splice(&rh->failed_recovered_regions, &failed_recovered);
400                 INIT_LIST_HEAD(&rh->failed_recovered_regions);
401
402                 list_for_each_entry (reg, &failed_recovered, list)
403                         list_del(&reg->hash_list);
404         }
405
406         spin_unlock(&rh->region_lock);
407         write_unlock_irq(&rh->hash_lock);
408
409         /*
410          * All the regions on the recovered and clean lists have
411          * now been pulled out of the system, so no need to do
412          * any more locking.
413          */
414         list_for_each_entry_safe (reg, next, &recovered, list) {
415                 rh->log->type->clear_region(rh->log, reg->key);
416                 complete_resync_work(reg, 1);
417                 mempool_free(reg, rh->region_pool);
418         }
419
420         list_for_each_entry_safe (reg, next, &failed_recovered, list) {
421                 complete_resync_work(reg, 0);
422                 mempool_free(reg, rh->region_pool);
423         }
424
425         if (!list_empty(&recovered))
426                 rh->log->type->flush(rh->log);
427
428         list_for_each_entry_safe (reg, next, &clean, list)
429                 mempool_free(reg, rh->region_pool);
430 }
431
432 static void rh_inc(struct region_hash *rh, region_t region)
433 {
434         struct region *reg;
435
436         read_lock(&rh->hash_lock);
437         reg = __rh_find(rh, region);
438
439         spin_lock_irq(&rh->region_lock);
440         atomic_inc(&reg->pending);
441
442         if (reg->state == RH_CLEAN) {
443                 reg->state = RH_DIRTY;
444                 list_del_init(&reg->list);      /* take off the clean list */
445                 spin_unlock_irq(&rh->region_lock);
446
447                 rh->log->type->mark_region(rh->log, reg->key);
448         } else
449                 spin_unlock_irq(&rh->region_lock);
450
451
452         read_unlock(&rh->hash_lock);
453 }
454
455 static void rh_inc_pending(struct region_hash *rh, struct bio_list *bios)
456 {
457         struct bio *bio;
458
459         for (bio = bios->head; bio; bio = bio->bi_next)
460                 rh_inc(rh, bio_to_region(rh, bio));
461 }
462
463 static void rh_dec(struct region_hash *rh, region_t region)
464 {
465         unsigned long flags;
466         struct region *reg;
467         int should_wake = 0;
468
469         read_lock(&rh->hash_lock);
470         reg = __rh_lookup(rh, region);
471         read_unlock(&rh->hash_lock);
472
473         spin_lock_irqsave(&rh->region_lock, flags);
474         if (atomic_dec_and_test(&reg->pending)) {
475                 /*
476                  * There is no pending I/O for this region.
477                  * We can move the region to corresponding list for next action.
478                  * At this point, the region is not yet connected to any list.
479                  *
480                  * If the state is RH_NOSYNC, the region should be kept off
481                  * from clean list.
482                  * The hash entry for RH_NOSYNC will remain in memory
483                  * until the region is recovered or the map is reloaded.
484                  */
485
486                 /* do nothing for RH_NOSYNC */
487                 if (reg->state == RH_RECOVERING) {
488                         list_add_tail(&reg->list, &rh->quiesced_regions);
489                 } else if (reg->state == RH_DIRTY) {
490                         reg->state = RH_CLEAN;
491                         list_add(&reg->list, &rh->clean_regions);
492                 }
493                 should_wake = 1;
494         }
495         spin_unlock_irqrestore(&rh->region_lock, flags);
496
497         if (should_wake)
498                 wake();
499 }
500
501 /*
502  * Starts quiescing a region in preparation for recovery.
503  */
504 static int __rh_recovery_prepare(struct region_hash *rh)
505 {
506         int r;
507         struct region *reg;
508         region_t region;
509
510         /*
511          * Ask the dirty log what's next.
512          */
513         r = rh->log->type->get_resync_work(rh->log, &region);
514         if (r <= 0)
515                 return r;
516
517         /*
518          * Get this region, and start it quiescing by setting the
519          * recovering flag.
520          */
521         read_lock(&rh->hash_lock);
522         reg = __rh_find(rh, region);
523         read_unlock(&rh->hash_lock);
524
525         spin_lock_irq(&rh->region_lock);
526         reg->state = RH_RECOVERING;
527
528         /* Already quiesced ? */
529         if (atomic_read(&reg->pending))
530                 list_del_init(&reg->list);
531         else
532                 list_move(&reg->list, &rh->quiesced_regions);
533
534         spin_unlock_irq(&rh->region_lock);
535
536         return 1;
537 }
538
539 static void rh_recovery_prepare(struct region_hash *rh)
540 {
541         /* Extra reference to avoid race with rh_stop_recovery */
542         atomic_inc(&rh->recovery_in_flight);
543
544         while (!down_trylock(&rh->recovery_count)) {
545                 atomic_inc(&rh->recovery_in_flight);
546                 if (__rh_recovery_prepare(rh) <= 0) {
547                         atomic_dec(&rh->recovery_in_flight);
548                         up(&rh->recovery_count);
549                         break;
550                 }
551         }
552
553         /* Drop the extra reference */
554         if (atomic_dec_and_test(&rh->recovery_in_flight))
555                 wake_up_all(&recovery_stopped_event);
556 }
557
558 /*
559  * Returns any quiesced regions.
560  */
561 static struct region *rh_recovery_start(struct region_hash *rh)
562 {
563         struct region *reg = NULL;
564
565         spin_lock_irq(&rh->region_lock);
566         if (!list_empty(&rh->quiesced_regions)) {
567                 reg = list_entry(rh->quiesced_regions.next,
568                                  struct region, list);
569                 list_del_init(&reg->list);      /* remove from the quiesced list */
570         }
571         spin_unlock_irq(&rh->region_lock);
572
573         return reg;
574 }
575
576 static void rh_recovery_end(struct region *reg, int success)
577 {
578         struct region_hash *rh = reg->rh;
579
580         spin_lock_irq(&rh->region_lock);
581         if (success ||
582             (rh->log->type->get_failure_response(rh->log) == DMLOG_IOERR_IGNORE))
583                 list_add(&reg->list, &reg->rh->recovered_regions);
584         else {
585                 reg->state = RH_NOSYNC;
586                 list_add(&reg->list, &reg->rh->failed_recovered_regions);
587         }
588         spin_unlock_irq(&rh->region_lock);
589
590         wake();
591 }
592
593 static int rh_flush(struct region_hash *rh)
594 {
595         return rh->log->type->flush(rh->log);
596 }
597
598 static void rh_delay(struct region_hash *rh, struct bio *bio)
599 {
600         struct region *reg;
601
602         read_lock(&rh->hash_lock);
603         reg = __rh_find(rh, bio_to_region(rh, bio));
604         bio_list_add(&reg->delayed_bios, bio);
605         read_unlock(&rh->hash_lock);
606 }
607
608 static void rh_stop_recovery(struct region_hash *rh)
609 {
610         int i;
611
612         /* wait for any recovering regions */
613         for (i = 0; i < MAX_RECOVERY; i++)
614                 down(&rh->recovery_count);
615 }
616
617 static void rh_start_recovery(struct region_hash *rh)
618 {
619         int i;
620
621         for (i = 0; i < MAX_RECOVERY; i++)
622                 up(&rh->recovery_count);
623
624         wake();
625 }
626
627 struct bio_map_info {
628         struct mirror *bmi_m;
629         struct dm_bio_details bmi_bd;
630 };
631
632 static mempool_t *bio_map_info_pool = NULL;
633
634 static void *bio_map_info_alloc(unsigned int gfp_mask, void *pool_data){
635         return kmalloc(sizeof(struct bio_map_info), gfp_mask);
636 }
637
638 static void bio_map_info_free(void *element, void *pool_data){
639         kfree(element);
640 }
641
642 /*
643  * Every mirror should look like this one.
644  */
645 #define DEFAULT_MIRROR 0
646
647 /*
648  * This is yucky.  We squirrel the mirror struct away inside
649  * bi_next for read/write buffers.  This is safe since the bh
650  * doesn't get submitted to the lower levels of block layer.
651  */
652 static struct mirror *bio_get_m(struct bio *bio)
653 {
654         return (struct mirror *) bio->bi_next;
655 }
656
657 static void bio_set_m(struct bio *bio, struct mirror *m)
658 {
659         bio->bi_next = (struct bio *) m;
660 }
661
662 /*-----------------------------------------------------------------
663  * Recovery.
664  *
665  * When a mirror is first activated we may find that some regions
666  * are in the no-sync state.  We have to recover these by
667  * recopying from the default mirror to all the others.
668  *---------------------------------------------------------------*/
669 static void fail_mirror(struct mirror *m);
670 static void recovery_complete(int read_err, unsigned int write_err,
671                               void *context)
672 {
673         struct region *reg = (struct region *) context;
674         struct mirror_set *ms = reg->rh->ms;
675         unsigned long write_error = write_err;
676         int m, bit = 0;
677
678         if (read_err) {
679                 /* Read error means the failure of default mirror. */
680                 DMERR("Unable to read from primary mirror during recovery");
681                 fail_mirror(ms->default_mirror);
682         }
683
684         if (write_error) {
685                 DMERR("Write error during recovery (error = %#lx)",
686                       write_error);
687                 /*
688                  * Bits correspond to devices (excluding default mirror).
689                  * The default mirror cannot change during recovery.
690                  */
691                 for (m = 0; m < ms->nr_mirrors; m++) {
692                         if (&ms->mirror[m] == ms->default_mirror)
693                                 continue;
694                         if (test_bit(bit, &write_error))
695                                 fail_mirror(ms->mirror + m);
696                         bit++;
697                 }
698         }
699
700         rh_recovery_end(reg, !(read_err || write_err));
701 }
702
703 static int recover(struct mirror_set *ms, struct region *reg)
704 {
705         int r;
706         unsigned int i;
707         struct io_region from, to[KCOPYD_MAX_REGIONS], *dest;
708         struct mirror *m;
709         unsigned long flags = 0;
710
711         /* fill in the source */
712         m = ms->default_mirror;
713         from.bdev = m->dev->bdev;
714         from.sector = m->offset + region_to_sector(reg->rh, reg->key);
715         if (reg->key == (ms->nr_regions - 1)) {
716                 /*
717                  * The final region may be smaller than
718                  * region_size.
719                  */
720                 from.count = ms->ti->len & (reg->rh->region_size - 1);
721                 if (!from.count)
722                         from.count = reg->rh->region_size;
723         } else
724                 from.count = reg->rh->region_size;
725
726         /* fill in the destinations */
727         for (i = 0, dest = to; i < ms->nr_mirrors; i++) {
728                 if (&ms->mirror[i] == ms->default_mirror)
729                         continue;
730
731                 m = ms->mirror + i;
732                 dest->bdev = m->dev->bdev;
733                 dest->sector = m->offset + region_to_sector(reg->rh, reg->key);
734                 dest->count = from.count;
735                 dest++;
736         }
737
738         /* hand to kcopyd */
739         if (ms->rh.log->type->get_failure_response(ms->rh.log) == DMLOG_IOERR_IGNORE)
740                 set_bit(KCOPYD_IGNORE_ERROR, &flags);
741
742         r = kcopyd_copy(ms->kcopyd_client, &from, ms->nr_mirrors - 1, to, flags,
743                         recovery_complete, reg);
744
745         return r;
746 }
747
748 static void do_recovery(struct mirror_set *ms)
749 {
750         int r;
751         struct region *reg;
752         struct dirty_log *log = ms->rh.log;
753
754         /*
755          * Start quiescing some regions.
756          */
757         rh_recovery_prepare(&ms->rh);
758
759         /*
760          * Copy any already quiesced regions.
761          */
762         while ((reg = rh_recovery_start(&ms->rh))) {
763                 r = recover(ms, reg);
764                 if (r)
765                         rh_recovery_end(reg, 0);
766         }
767
768         /*
769          * Update the in sync flag if necessary.
770          * Raise an event when the mirror becomes in-sync.
771          *
772          * After recovery completes, the mirror becomes in_sync.
773          * Only an I/O failure can then take it back out-of-sync.
774          */
775         if (log->type->get_sync_count(log) == ms->nr_regions) {
776                 if (!ms->in_sync) {
777                         dm_table_event(ms->ti->table);
778                         ms->in_sync = 1;
779                 }
780         } else if (ms->in_sync)
781                 ms->in_sync = 0;
782 }
783
784 /*-----------------------------------------------------------------
785  * Reads
786  *---------------------------------------------------------------*/
787 /* Switch to next dev, via round-robin, after MIN_READS reads */
788 #define MIN_READS 128
789
790 /* choose_mirror
791  * @ms: the mirror set
792  *
793  * This function is used for read balancing.
794  *
795  * Returns: chosen mirror, or NULL on failure
796  */
797 static struct mirror *choose_mirror(struct mirror_set *ms)
798 {
799         struct mirror *start_mirror = ms->read_mirror;
800
801         /*
802          * Perform MIN_READS on each working mirror then
803          * advance to the next one.  start_mirror stores
804          * the first we tried, so we know when we're done.
805          */
806         do {
807                 if (likely(!atomic_read(&ms->read_mirror->error_count)) &&
808                     !atomic_dec_and_test(&ms->read_count))
809                         goto use_mirror;
810
811                 atomic_set(&ms->read_count, MIN_READS);
812
813                 if (ms->read_mirror-- == ms->mirror)
814                         ms->read_mirror += ms->nr_mirrors;
815         } while (ms->read_mirror != start_mirror);
816
817         /*
818          * We've rejected every mirror.
819          * Confirm the start_mirror can be used.
820          */
821         if (unlikely(atomic_read(&ms->read_mirror->error_count)))
822                 return NULL;
823
824 use_mirror:
825         return ms->read_mirror;
826 }
827
828 /* fail_mirror
829  * @m: mirror device to fail
830  *
831  * If the device is valid, mark it invalid.  Also,
832  * if this is the default mirror device (i.e. the primary
833  * device) and the mirror set is in-sync, choose an
834  * alternate primary device.
835  *
836  * This function cannot block.
837  */
838 static void fail_mirror(struct mirror *m)
839 {
840         struct mirror_set *ms = m->ms;
841         struct mirror *new;
842
843         atomic_inc(&m->error_count);
844
845         if (atomic_read(&m->error_count) > 1)
846                 return;
847
848         if (m != ms->default_mirror)
849                 return;
850
851         /*
852          * If the default mirror fails, change it.
853          * In the case of cluster mirroring, the default
854          * is changed in rh_update_states.
855          */
856         if (!ms->in_sync) {
857                 /*
858                  * Can not switch primary.  Better to issue requests
859                  * to same failing device than to risk returning
860                  * corrupt data.
861                  */
862                 DMERR("Primary mirror device has failed while mirror is not in-sync");
863                 DMERR("Unable to choose alternative primary device");
864                 return;
865         }
866
867         for (new = ms->mirror; new < ms->mirror + ms->nr_mirrors; new++)
868                 if (!atomic_read(&new->error_count)) {
869                         ms->default_mirror = new;
870                         break;
871                 }
872
873         if (unlikely(new == ms->mirror + ms->nr_mirrors))
874                 DMWARN("All sides of mirror have failed.");
875 }
876
877 static int default_ok(struct mirror *m)
878 {
879         return !atomic_read(&m->ms->default_mirror->error_count);
880 }
881
882 static int mirror_available(struct mirror_set *ms, struct bio *bio)
883 {
884         region_t region = bio_to_region(&ms->rh, bio);
885
886         if (ms->rh.log->type->in_sync(ms->rh.log, region, 0) > 0)
887                 return choose_mirror(ms) ? 1 : 0;
888
889         return 0;
890 }
891
892 /*
893  * remap a buffer to a particular mirror.
894  */
895 static sector_t map_sector(struct mirror *m, struct bio *bio)
896 {
897         return m->offset + (bio->bi_sector - m->ms->ti->begin);
898 }
899
900 static void map_bio(struct mirror *m, struct bio *bio)
901 {
902         bio->bi_bdev = m->dev->bdev;
903         bio->bi_sector = map_sector(m, bio);
904 }
905
906 static void map_region(struct io_region *io, struct mirror *m,
907                        struct bio *bio)
908 {
909         io->bdev = m->dev->bdev;
910         io->sector = map_sector(m, bio);
911         io->count = bio->bi_size >> 9;
912 }
913
914 /*-----------------------------------------------------------------
915  * Reads
916  *---------------------------------------------------------------*/
917 static void read_callback(unsigned long error, void *context)
918 {
919         struct bio *bio = (struct bio *)context;
920         struct mirror *m;
921
922         m = bio_get_m(bio);
923         bio_set_m(bio, NULL);
924
925         if (unlikely(error)) {
926                 DMWARN("A read failure occurred on a mirror device.");
927                 fail_mirror(m);
928                 if (likely(default_ok(m)) || mirror_available(m->ms, bio)) {
929                         DMWARN("Trying different device.");
930                         queue_bio(m->ms, bio, bio_rw(bio));
931                 } else {
932                         DMERR("No other device available, failing I/O.");
933                         bio_endio(bio, bio->bi_size, -EIO);
934                 }
935         } else
936                 bio_endio(bio, bio->bi_size, 0);
937 }
938
939 /* Asynchronous read. */
940 static void read_async_bio(struct mirror *m, struct bio *bio)
941 {
942         struct io_region io;
943
944         map_region(&io, m, bio);
945         bio_set_m(bio, m);
946         dm_io_async_bvec(1, &io, READ,
947                          bio->bi_io_vec + bio->bi_idx,
948                          read_callback, bio);
949 }
950
951 static void do_reads(struct mirror_set *ms, struct bio_list *reads)
952 {
953         struct bio *bio;
954         struct mirror *m;
955
956         while ((bio = bio_list_pop(reads))) {
957                 /*
958                  * We can only read balance if the region is in sync.
959                  */
960                 if (likely(rh_in_sync(&ms->rh,
961                                       bio_to_region(&ms->rh, bio), 0)))
962                         m = choose_mirror(ms);
963                 else {
964                         m = ms->default_mirror;
965
966                         /* If default has failed, we give up. */
967                         if (unlikely(m && atomic_read(&m->error_count)))
968                                 m = NULL;
969                 }
970
971                 if (likely(m))
972                         read_async_bio(m, bio);
973                 else
974                         bio_endio(bio, bio->bi_size, -EIO);
975         }
976 }
977
978 /*-----------------------------------------------------------------
979  * Writes.
980  *
981  * We do different things with the write io depending on the
982  * state of the region that it's in:
983  *
984  * SYNC:        increment pending, use kcopyd to write to *all* mirrors
985  * RECOVERING:  delay the io until recovery completes
986  * NOSYNC:      increment pending, just write to the default mirror
987  *---------------------------------------------------------------*/
988
989 /* __bio_mark_nosync
990  * @ms
991  * @bio
992  * @done
993  * @error
994  *
995  * The bio was written on some mirror(s) but failed on other mirror(s).
996  * We can successfully endio the bio but should avoid the region being
997  * marked clean by setting the state RH_NOSYNC.
998  *
999  * This function is _not_ interrupt safe!
1000  */
1001 static void __bio_mark_nosync(struct mirror_set *ms,
1002                               struct bio *bio, unsigned int done, int error)
1003 {
1004         unsigned long flags;
1005         struct region_hash *rh = &ms->rh;
1006         struct dirty_log *log = ms->rh.log;
1007         struct region *reg;
1008         region_t region = bio_to_region(rh, bio);
1009         int recovering = 0;
1010
1011         ms->in_sync = 0;
1012
1013         /* We must inform the log that the sync count has changed. */
1014         log->type->set_region_sync(log, region, 0);
1015
1016         read_lock(&rh->hash_lock);
1017         reg = __rh_find(rh, region);
1018         read_unlock(&rh->hash_lock);
1019
1020         /* region hash entry should exist because write was in-flight */
1021         BUG_ON(!reg);
1022         BUG_ON(!list_empty(&reg->list));
1023
1024         spin_lock_irqsave(&rh->region_lock, flags);
1025         /*
1026          * Possible cases:
1027          *   1) RH_DIRTY
1028          *   2) RH_NOSYNC: was dirty, other preceeding writes failed
1029          *   3) RH_RECOVERING: flushing pending writes
1030          * Either case, the region should have not been connected to list.
1031          */
1032         recovering = (reg->state == RH_RECOVERING);
1033         reg->state = RH_NOSYNC;
1034         BUG_ON(!list_empty(&reg->list));
1035         spin_unlock_irqrestore(&rh->region_lock, flags);
1036
1037         bio_endio(bio, done, error);
1038         if (recovering)
1039                 complete_resync_work(reg, 0);
1040 }
1041
1042 static void write_callback(unsigned long error, void *context, int log_failure)
1043 {
1044         unsigned int i, ret = 0;
1045         struct bio *bio = (struct bio *) context;
1046         struct mirror_set *ms;
1047         int uptodate = 0;
1048         int should_wake = 0;
1049
1050         ms = (bio_get_m(bio))->ms;
1051         bio_set_m(bio, NULL);
1052
1053         /*
1054          * NOTE: We don't decrement the pending count here,
1055          * instead it is done by the targets endio function.
1056          * This way we handle both writes to SYNC and NOSYNC
1057          * regions with the same code.
1058          */
1059         if (unlikely(error)) {
1060                 DMERR("Error during write occurred.");
1061
1062                 /*
1063                  * If the log is intact, we can play around with trying
1064                  * to handle the failure.  Otherwise, we have to report
1065                  * the I/O as failed.
1066                  */
1067                 if (!log_failure) {
1068                         for (i = 0; i < ms->nr_mirrors; i++) {
1069                                 if (test_bit(i, &error))
1070                                         fail_mirror(ms->mirror + i);
1071                                 else
1072                                         uptodate = 1;
1073                         }
1074                 }
1075
1076                 if (likely(uptodate)) {
1077                         /*
1078                          * Need to raise event.  Since raising
1079                          * events can block, we need to do it in
1080                          * the main thread.
1081                          */
1082                         spin_lock(&ms->lock);
1083                         if (!ms->failures.head)
1084                                 should_wake = 1;
1085                         bio_list_add(&ms->failures, bio);
1086                         spin_unlock(&ms->lock);
1087                         if (should_wake)
1088                                 wake();
1089                         return;
1090                 } else {
1091                         DMERR("All replicated volumes dead, failing I/O");
1092                         /* None of the writes succeeded, fail the I/O. */
1093                         ret = -EIO;
1094                 }
1095         }
1096
1097         bio_endio(bio, bio->bi_size, ret);
1098 }
1099
1100 static void write_callback_good_log(unsigned long error, void *context)
1101 {
1102         write_callback(error, context, 0);
1103 }
1104
1105 static void write_callback_bad_log(unsigned long error, void *context)
1106 {
1107         write_callback(error, context, 1);
1108 }
1109
1110 static void do_write(struct mirror_set *ms, struct bio *bio, int log_failure)
1111 {
1112         unsigned int i;
1113         struct io_region io[ms->nr_mirrors], *dest = io;
1114         struct mirror *m;
1115
1116         if (log_failure && dm_mirror_error_on_log_failure) {
1117                 bio_endio(bio, bio->bi_size, -EIO);
1118                 return;
1119         }
1120
1121         for (i = 0, m = ms->mirror; i < ms->nr_mirrors; i++, m++)
1122                 map_region(dest++, m, bio);
1123
1124         /*
1125          * We can use the default mirror here, because we
1126          * only need it in order to retrieve the reference
1127          * to the mirror set in write_callback().
1128          */
1129         bio_set_m(bio, ms->default_mirror);
1130         if (log_failure)
1131                 dm_io_async_bvec(ms->nr_mirrors, io, WRITE,
1132                                  bio->bi_io_vec + bio->bi_idx,
1133                                  write_callback_bad_log, bio);
1134         else
1135                 dm_io_async_bvec(ms->nr_mirrors, io, WRITE,
1136                                  bio->bi_io_vec + bio->bi_idx,
1137                                  write_callback_good_log, bio);
1138 }
1139
1140 static void do_writes(struct mirror_set *ms, struct bio_list *writes)
1141 {
1142         int state, r;
1143         struct bio *bio;
1144         struct bio_list sync, nosync, recover, *this_list = NULL;
1145         struct bio_list requeue;
1146         struct dirty_log *log = ms->rh.log;
1147         region_t region;
1148
1149         if (!writes->head)
1150                 return;
1151
1152         /*
1153          * Classify each write.
1154          */
1155         bio_list_init(&sync);
1156         bio_list_init(&nosync);
1157         bio_list_init(&recover);
1158         bio_list_init(&requeue);
1159
1160         while ((bio = bio_list_pop(writes))) {
1161                 region = bio_to_region(&ms->rh, bio);
1162
1163                 if (log->type->is_remote_recovering &&
1164                     log->type->is_remote_recovering(log, region)) {
1165                         bio_list_add(&requeue, bio);
1166                         continue;
1167                 }
1168
1169                 state = rh_state(&ms->rh, region, 1);
1170                 switch (state) {
1171                 case RH_CLEAN:
1172                 case RH_DIRTY:
1173                         this_list = &sync;
1174                         break;
1175
1176                 case RH_NOSYNC:
1177                         this_list = &nosync;
1178                         break;
1179
1180                 case RH_RECOVERING:
1181                         this_list = &recover;
1182                         break;
1183                 }
1184
1185                 bio_list_add(this_list, bio);
1186         }
1187
1188         /*
1189          * Add bios that are delayed due to remote recovery
1190          * back on to the write queue
1191          */
1192         spin_lock_irq(&ms->lock);
1193         bio_list_merge(&ms->writes, &requeue);
1194         spin_unlock_irq(&ms->lock);
1195
1196         /*
1197          * Increment the pending counts for any regions that will
1198          * be written to (writes to recover regions are going to
1199          * be delayed).
1200          */
1201         rh_inc_pending(&ms->rh, &sync);
1202         rh_inc_pending(&ms->rh, &nosync);
1203
1204         r = rh_flush(&ms->rh);
1205
1206         /*
1207          * Dispatch io.
1208          */
1209         while ((bio = bio_list_pop(&sync)))
1210                 do_write(ms, bio, r ? 1 : 0);
1211
1212         while ((bio = bio_list_pop(&recover)))
1213                 rh_delay(&ms->rh, bio);
1214
1215         while ((bio = bio_list_pop(&nosync))) {
1216                 map_bio(ms->default_mirror, bio);
1217                 generic_make_request(bio);
1218         }
1219 }
1220
1221 static void do_failures(struct mirror_set *ms, struct bio_list *failures)
1222 {
1223         struct bio *bio;
1224         struct dirty_log *log = ms->rh.log;
1225
1226         if (!failures->head)
1227                 return;
1228
1229         if (log->type->get_failure_response(log) == DMLOG_IOERR_BLOCK)
1230                 dm_table_event(ms->ti->table);
1231
1232         while ((bio = bio_list_pop(failures)))
1233                 __bio_mark_nosync(ms, bio, bio->bi_size, 0);
1234 }
1235
1236 /*-----------------------------------------------------------------
1237  * kmirrord
1238  *---------------------------------------------------------------*/
1239 static LIST_HEAD(_mirror_sets);
1240 static DECLARE_RWSEM(_mirror_sets_lock);
1241
1242 static int do_mirror(struct mirror_set *ms)
1243 {
1244         struct bio_list reads, writes, failures;
1245
1246         spin_lock_irq(&ms->lock);
1247         reads = ms->reads;
1248         writes = ms->writes;
1249         failures = ms->failures;
1250         bio_list_init(&ms->reads);
1251         bio_list_init(&ms->writes);
1252         bio_list_init(&ms->failures);
1253         spin_unlock_irq(&ms->lock);
1254
1255         rh_update_states(&ms->rh);
1256         do_recovery(ms);
1257         do_reads(ms, &reads);
1258         do_writes(ms, &writes);
1259         do_failures(ms, &failures);
1260
1261         return (ms->writes.head) ? 1 : 0;
1262 }
1263
1264 static int _do_work(void)
1265 {
1266         int more_work = 0;
1267         struct mirror_set *ms;
1268
1269         down_read(&_mirror_sets_lock);
1270         list_for_each_entry (ms, &_mirror_sets, list)
1271                 more_work += do_mirror(ms);
1272         up_read(&_mirror_sets_lock);
1273
1274         return more_work;
1275 }
1276
1277 static void do_work(void *ignored)
1278 {
1279         while (_do_work()) {
1280                 set_current_state(TASK_INTERRUPTIBLE);
1281                 schedule_timeout(HZ/5);
1282         }
1283 }
1284
1285 /*-----------------------------------------------------------------
1286  * Target functions
1287  *---------------------------------------------------------------*/
1288 static struct mirror_set *alloc_context(unsigned int nr_mirrors,
1289                                         uint32_t region_size,
1290                                         struct dm_target *ti,
1291                                         struct dirty_log *dl)
1292 {
1293         size_t len;
1294         struct mirror_set *ms = NULL;
1295
1296         if (array_too_big(sizeof(*ms), sizeof(ms->mirror[0]), nr_mirrors))
1297                 return NULL;
1298
1299         len = sizeof(*ms) + (sizeof(ms->mirror[0]) * nr_mirrors);
1300
1301         ms = kmalloc(len, GFP_KERNEL);
1302         if (!ms) {
1303                 ti->error = "Cannot allocate mirror context";
1304                 return NULL;
1305         }
1306
1307         memset(ms, 0, len);
1308         spin_lock_init(&ms->lock);
1309
1310         ms->ti = ti;
1311         ms->nr_mirrors = nr_mirrors;
1312         ms->nr_regions = dm_sector_div_up(ti->len, region_size);
1313         ms->in_sync = 0;
1314         ms->read_mirror = &ms->mirror[DEFAULT_MIRROR];
1315         ms->default_mirror = &ms->mirror[DEFAULT_MIRROR];
1316
1317         if (rh_init(&ms->rh, ms, dl, region_size, ms->nr_regions)) {
1318                 ti->error = "Error creating dirty region hash";
1319                 kfree(ms);
1320                 return NULL;
1321         }
1322
1323         atomic_set(&ms->read_count, MIN_READS);
1324
1325         bio_list_init(&ms->failures);
1326
1327         return ms;
1328 }
1329
1330 static void free_context(struct mirror_set *ms, struct dm_target *ti,
1331                          unsigned int m)
1332 {
1333         while (m--)
1334                 dm_put_device(ti, ms->mirror[m].dev);
1335
1336         rh_exit(&ms->rh);
1337         kfree(ms);
1338 }
1339
1340 static inline int _check_region_size(struct dm_target *ti, uint32_t size)
1341 {
1342         return !(size % (PAGE_SIZE >> 9) || (size & (size - 1)) ||
1343                  size > ti->len);
1344 }
1345
1346 static int get_mirror(struct mirror_set *ms, struct dm_target *ti,
1347                       unsigned int mirror, char **argv)
1348 {
1349         unsigned long long offset;
1350
1351         if (sscanf(argv[1], "%llu", &offset) != 1) {
1352                 ti->error = "Invalid offset";
1353                 return -EINVAL;
1354         }
1355
1356         if (dm_get_device(ti, argv[0], offset, ti->len,
1357                           dm_table_get_mode(ti->table),
1358                           &ms->mirror[mirror].dev)) {
1359                 ti->error = "Device lookup failure";
1360                 return -ENXIO;
1361         }
1362
1363         ms->mirror[mirror].offset = offset;
1364         atomic_set(&(ms->mirror[mirror].error_count), 0);
1365         ms->mirror[mirror].ms = ms;
1366
1367         return 0;
1368 }
1369
1370 static int add_mirror_set(struct mirror_set *ms)
1371 {
1372         down_write(&_mirror_sets_lock);
1373         list_add_tail(&ms->list, &_mirror_sets);
1374         up_write(&_mirror_sets_lock);
1375         wake();
1376
1377         return 0;
1378 }
1379
1380 static void del_mirror_set(struct mirror_set *ms)
1381 {
1382         down_write(&_mirror_sets_lock);
1383         list_del(&ms->list);
1384         up_write(&_mirror_sets_lock);
1385 }
1386
1387 /*
1388  * Create dirty log: log_type #log_params <log_params>
1389  */
1390 static struct dirty_log *create_dirty_log(struct dm_target *ti,
1391                                           unsigned int argc, char **argv,
1392                                           unsigned int *args_used)
1393 {
1394         unsigned int param_count;
1395         struct dirty_log *dl;
1396
1397         if (argc < 2) {
1398                 ti->error = "Insufficient mirror log arguments";
1399                 return NULL;
1400         }
1401
1402         if (sscanf(argv[1], "%u", &param_count) != 1) {
1403                 ti->error = "Invalid mirror log argument count";
1404                 return NULL;
1405         }
1406
1407         *args_used = 2 + param_count;
1408
1409         if (argc < *args_used) {
1410                 ti->error = "Insufficient mirror log arguments";
1411                 return NULL;
1412         }
1413
1414         dl = dm_create_dirty_log(argv[0], ti, param_count, argv + 2);
1415         if (!dl) {
1416                 ti->error = "Error creating mirror dirty log";
1417                 return NULL;
1418         }
1419
1420         if (!_check_region_size(ti, dl->type->get_region_size(dl))) {
1421                 ti->error = "Invalid region size";
1422                 dm_destroy_dirty_log(dl);
1423                 return NULL;
1424         }
1425
1426         return dl;
1427 }
1428
1429 /*
1430  * Construct a mirror mapping:
1431  *
1432  * log_type #log_params <log_params>
1433  * #mirrors [mirror_path offset]{2,}
1434  *
1435  * log_type is "core" or "disk"
1436  * #log_params is between 1 and 3
1437  */
1438 #define DM_IO_PAGES 64
1439 static int mirror_ctr(struct dm_target *ti, unsigned int argc, char **argv)
1440 {
1441         int r;
1442         unsigned int nr_mirrors, m, args_used;
1443         struct mirror_set *ms;
1444         struct dirty_log *dl;
1445
1446         dl = create_dirty_log(ti, argc, argv, &args_used);
1447         if (!dl)
1448                 return -EINVAL;
1449
1450         argv += args_used;
1451         argc -= args_used;
1452
1453         if (!argc || sscanf(argv[0], "%u", &nr_mirrors) != 1 ||
1454             nr_mirrors < 2 || nr_mirrors > KCOPYD_MAX_REGIONS + 1) {
1455                 ti->error = "Invalid number of mirrors";
1456                 dm_destroy_dirty_log(dl);
1457                 return -EINVAL;
1458         }
1459
1460         argv++, argc--;
1461
1462         if (argc != nr_mirrors * 2) {
1463                 ti->error = "Wrong number of mirror arguments";
1464                 dm_destroy_dirty_log(dl);
1465                 return -EINVAL;
1466         }
1467
1468         ms = alloc_context(nr_mirrors, dl->type->get_region_size(dl), ti, dl);
1469         if (!ms) {
1470                 dm_destroy_dirty_log(dl);
1471                 return -ENOMEM;
1472         }
1473
1474         /* Get the mirror parameter sets */
1475         for (m = 0; m < nr_mirrors; m++) {
1476                 r = get_mirror(ms, ti, m, argv);
1477                 if (r) {
1478                         free_context(ms, ti, m);
1479                         return r;
1480                 }
1481                 argv += 2;
1482                 argc -= 2;
1483         }
1484
1485         ti->private = ms;
1486         ti->split_io = ms->rh.region_size;
1487
1488         r = kcopyd_client_create(DM_IO_PAGES, &ms->kcopyd_client);
1489         if (r) {
1490                 free_context(ms, ti, ms->nr_mirrors);
1491                 return r;
1492         }
1493
1494         add_mirror_set(ms);
1495         return 0;
1496 }
1497
1498 static void mirror_dtr(struct dm_target *ti)
1499 {
1500         struct mirror_set *ms = (struct mirror_set *) ti->private;
1501
1502         del_mirror_set(ms);
1503         kcopyd_client_destroy(ms->kcopyd_client);
1504         free_context(ms, ti, ms->nr_mirrors);
1505 }
1506
1507 static void queue_bio(struct mirror_set *ms, struct bio *bio, int rw)
1508 {
1509         unsigned long flags;
1510         int should_wake = 0;
1511         struct bio_list *bl;
1512
1513         bl = (rw == WRITE) ? &ms->writes : &ms->reads;
1514         spin_lock_irqsave(&ms->lock, flags);
1515         should_wake = !(bl->head);
1516         bio_list_add(bl, bio);
1517         spin_unlock_irqrestore(&ms->lock, flags);
1518
1519         if (should_wake)
1520                 wake();
1521 }
1522
1523 /*
1524  * Mirror mapping function
1525  */
1526 static int mirror_map(struct dm_target *ti, struct bio *bio,
1527                       union map_info *map_context)
1528 {
1529         int r, rw = bio_rw(bio);
1530         struct mirror *m;
1531         struct mirror_set *ms = ti->private;
1532         struct bio_map_info *bmi = NULL;
1533         struct dm_bio_details *bd = NULL;
1534
1535         if (rw == WRITE) {
1536                 /* Save region for mirror_end_io() handler */
1537                 map_context->ll = bio_to_region(&ms->rh, bio);
1538                 queue_bio(ms, bio, rw);
1539                 return 0;
1540         }
1541
1542         /* All about the reads now */
1543
1544         r = ms->rh.log->type->in_sync(ms->rh.log,
1545                                       bio_to_region(&ms->rh, bio), 0);
1546         if (r < 0 && r != -EWOULDBLOCK)
1547                 return r;
1548
1549         if (r == -EWOULDBLOCK)
1550                 r = 0;
1551
1552         if (likely(r)) {
1553                 /*
1554                  * Optimize reads by avoiding to hand them to daemon.
1555                  *
1556                  * In case they fail, queue them for another shot
1557                  * in the mirror_end_io() function.
1558                  */
1559                 m = choose_mirror(ms);
1560                 if (likely(m)) {
1561                         bmi = mempool_alloc(bio_map_info_pool, GFP_NOIO);
1562
1563                         if (likely(bmi)) {
1564                                 /* without this, a read is not retryable */
1565                                 bd = &bmi->bmi_bd;
1566                                 dm_bio_record(bd, bio);
1567                                 map_context->ptr = bmi;
1568                                 bmi->bmi_m = m;
1569                         } else {
1570                                 /* we could fail now, but we can at least  **
1571                                 ** give it a shot.  The bd is only used to **
1572                                 ** retry in the event of a failure anyway. **
1573                                 ** If we fail, we can fail the I/O then.   */
1574                                 map_context->ptr = NULL;
1575                         }
1576
1577                         map_bio(m, bio);
1578                         return 1; /* Mapped -> queue request. */
1579                 } else
1580                         return -EIO;
1581         } else {
1582                 /* Either not clean, or -EWOULDBLOCK */
1583                 if (rw == READA)
1584                         return -EWOULDBLOCK;
1585
1586                 queue_bio(ms, bio, rw);
1587         }
1588
1589         return 0;
1590 }
1591
1592 static int mirror_end_io(struct dm_target *ti, struct bio *bio,
1593                          int error, union map_info *map_context)
1594 {
1595         int rw = bio_rw(bio);
1596         struct mirror_set *ms = (struct mirror_set *) ti->private;
1597         struct mirror *m = NULL;
1598         struct dm_bio_details *bd = NULL;
1599
1600         /*
1601          * We need to dec pending if this was a write.
1602          */
1603         if (rw == WRITE) {
1604                 rh_dec(&ms->rh, map_context->ll);
1605                 return error;
1606         }
1607
1608         if (error == -EOPNOTSUPP)
1609                 goto out;
1610
1611         if ((error == -EWOULDBLOCK) && bio_rw_ahead(bio))
1612                 goto out;
1613
1614         if (unlikely(error)) {
1615                 DMERR("A read failure occurred on a mirror device.");
1616                 if (!map_context->ptr) {
1617                         /*
1618                          * There wasn't enough memory to record necessary
1619                          * information for a retry or there was no other
1620                          * mirror in-sync.
1621                          */
1622                         DMERR("Unable to retry read.");
1623                         return -EIO;
1624                 }
1625                 m = ((struct bio_map_info *)map_context->ptr)->bmi_m;
1626                 fail_mirror(m); /* Flag error on mirror. */
1627
1628                 /*
1629                  * A failed read needs to get queued
1630                  * to the daemon for another shot to
1631                  * one (if any) intact mirrors.
1632                  */
1633                 if (default_ok(m) || mirror_available(ms, bio)) {
1634                         bd = &(((struct bio_map_info *)map_context->ptr)->bmi_bd
1635                                 );
1636
1637                         DMWARN("Trying different device.");
1638                         dm_bio_restore(bd, bio);
1639                         mempool_free(map_context->ptr, bio_map_info_pool);
1640                         map_context->ptr = NULL;
1641                         queue_bio(ms, bio, rw);
1642                         return 1; /* We want another shot on the bio. */
1643                 }
1644                 DMERR("All replicated volumes dead, failing I/O");
1645         }
1646
1647 out:
1648         if (map_context->ptr)
1649                 mempool_free(map_context->ptr, bio_map_info_pool);
1650
1651         return error;
1652 }
1653
1654 static void mirror_presuspend(struct dm_target *ti)
1655 {
1656         struct mirror_set *ms = (struct mirror_set *) ti->private;
1657         struct dirty_log *log = ms->rh.log;
1658
1659         if (log->type->presuspend && log->type->presuspend(log))
1660                 /* FIXME: need better error handling */
1661                 DMWARN("log presuspend failed");
1662 }
1663
1664 static void mirror_postsuspend(struct dm_target *ti)
1665 {
1666         struct mirror_set *ms = (struct mirror_set *) ti->private;
1667         struct dirty_log *log = ms->rh.log;
1668
1669         rh_stop_recovery(&ms->rh);
1670
1671         /* Wait for all I/O we generated to complete */
1672         wait_event(recovery_stopped_event,
1673                    !atomic_read(&ms->rh.recovery_in_flight));
1674
1675         if (log->type->postsuspend && log->type->postsuspend(log))
1676                 /* FIXME: need better error handling */
1677                 DMWARN("log postsuspend failed");
1678 }
1679
1680 static void mirror_resume(struct dm_target *ti)
1681 {
1682         struct mirror_set *ms = (struct mirror_set *) ti->private;
1683         struct dirty_log *log = ms->rh.log;
1684         if (log->type->resume && log->type->resume(log))
1685                 /* FIXME: need better error handling */
1686                 DMWARN("log resume failed");
1687         rh_start_recovery(&ms->rh);
1688 }
1689
1690 static int mirror_status(struct dm_target *ti, status_type_t type,
1691                          char *result, unsigned int maxlen)
1692 {
1693         unsigned int m, sz = 0;
1694         struct mirror_set *ms = (struct mirror_set *) ti->private;
1695         char buffer[ms->nr_mirrors + 1];
1696
1697         switch (type) {
1698         case STATUSTYPE_INFO:
1699                 DMEMIT("%d ", ms->nr_mirrors);
1700                 for (m = 0; m < ms->nr_mirrors; m++) {
1701                         DMEMIT("%s ", ms->mirror[m].dev->name);
1702                         buffer[m] = atomic_read(&(ms->mirror[m].error_count)) ?
1703                                 'D' : 'A';
1704                 }
1705                 buffer[m] = '\0';
1706
1707                 DMEMIT("%llu/%llu 1 %s ",
1708                        ms->rh.log->type->get_sync_count(ms->rh.log),
1709                        ms->nr_regions, buffer);
1710                 ms->rh.log->type->status(ms->rh.log, type, result+sz, maxlen-sz);
1711                 break;
1712
1713         case STATUSTYPE_TABLE:
1714                 sz = ms->rh.log->type->status(ms->rh.log, type, result, maxlen);
1715                 DMEMIT("%d ", ms->nr_mirrors);
1716                 for (m = 0; m < ms->nr_mirrors; m++)
1717                         DMEMIT("%s %llu ", ms->mirror[m].dev->name,
1718                                 (unsigned long long)ms->mirror[m].offset);
1719         }
1720
1721         return 0;
1722 }
1723
1724 static struct target_type mirror_target = {
1725         .name    = "mirror",
1726         .version = {1, 2, 0},
1727         .module  = THIS_MODULE,
1728         .ctr     = mirror_ctr,
1729         .dtr     = mirror_dtr,
1730         .map     = mirror_map,
1731         .end_io  = mirror_end_io,
1732         .presuspend = mirror_presuspend,
1733         .postsuspend = mirror_postsuspend,
1734         .resume  = mirror_resume,
1735         .status  = mirror_status,
1736 };
1737
1738 static int __init dm_mirror_init(void)
1739 {
1740         int r;
1741
1742         bio_map_info_pool = mempool_create(100, bio_map_info_alloc,
1743                                            bio_map_info_free, NULL);
1744         if (!bio_map_info_pool)
1745                 return -ENOMEM;
1746
1747         r = dm_dirty_log_init();
1748         if (r)
1749                 return r;
1750
1751         _kmirrord_wq = create_singlethread_workqueue("kmirrord");
1752         if (!_kmirrord_wq) {
1753                 DMERR("couldn't start kmirrord");
1754                 dm_dirty_log_exit();
1755                 return -ENOMEM;
1756         }
1757         INIT_WORK(&_kmirrord_work, do_work, NULL);
1758
1759         r = dm_register_target(&mirror_target);
1760         if (r < 0) {
1761                 DMERR("%s: Failed to register mirror target",
1762                       mirror_target.name);
1763                 dm_dirty_log_exit();
1764                 destroy_workqueue(_kmirrord_wq);
1765         } else if (!dm_mirror_error_on_log_failure) {
1766                 DMWARN("Warning: dm_mirror_error_on_log_failure = 0");
1767                 DMWARN("In this mode, the following fault sequence could cause corruption:");
1768                 DMWARN("  1) Log device failure");
1769                 DMWARN("  2) Write I/O issued");
1770                 DMWARN("  3) Machine failure");
1771                 DMWARN("  4) Log device restored");
1772                 DMWARN("  5) Machine reboots");
1773                 DMWARN("If this happens, you must resync your mirror.");
1774         }
1775
1776         return r;
1777 }
1778
1779 static void __exit dm_mirror_exit(void)
1780 {
1781         int r;
1782
1783         r = dm_unregister_target(&mirror_target);
1784         if (r < 0)
1785                 DMERR("%s: unregister failed %d", mirror_target.name, r);
1786
1787         destroy_workqueue(_kmirrord_wq);
1788         dm_dirty_log_exit();
1789 }
1790
1791 /* Module hooks */
1792 module_init(dm_mirror_init);
1793 module_exit(dm_mirror_exit);
1794
1795 module_param(dm_mirror_error_on_log_failure, int, 1);
1796 MODULE_PARM_DESC(dm_mirror_error_on_log_failure, "Set to '0' if you want writes to succeed on log device failure");
1797 MODULE_DESCRIPTION(DM_NAME " mirror target");
1798 MODULE_AUTHOR("Joe Thornber");
1799 MODULE_LICENSE("GPL");