This commit was manufactured by cvs2svn to create branch 'vserver'.
[linux-2.6.git] / fs / intermezzo / replicator.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001 Cluster File Systems, Inc. <braam@clusterfs.com>
5  * Copyright (C) 2001 Tacit Networks, Inc. <phil@off.net>
6  *
7  *   This file is part of InterMezzo, http://www.inter-mezzo.org.
8  *
9  *   InterMezzo is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   InterMezzo is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with InterMezzo; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  * Manage RCVD records for clients in the kernel
23  *
24  */
25
26 #include <linux/module.h>
27 #include <asm/uaccess.h>
28
29 #include <linux/errno.h>
30 #include <linux/fs.h>
31 #include <linux/fsfilter.h>
32
33 #include "intermezzo_fs.h"
34
35 /*
36  * this file contains a hash table of replicators/clients for a
37  * fileset. It allows fast lookup and update of reintegration status
38  */
39
40 struct izo_offset_rec {
41         struct list_head or_list;
42         char             or_uuid[16];
43         loff_t           or_offset;
44 };
45
46 #define RCACHE_BITS 8
47 #define RCACHE_SIZE (1 << RCACHE_BITS)
48 #define RCACHE_MASK (RCACHE_SIZE - 1)
49
50 static struct list_head *
51 izo_rep_cache(void)
52 {
53         int i;
54         struct list_head *cache;
55         PRESTO_ALLOC(cache, sizeof(struct list_head) * RCACHE_SIZE);
56         if (cache == NULL) {
57                 CERROR("intermezzo-fatal: no memory for replicator cache\n");
58                 return NULL;
59         }
60         memset(cache, 0, sizeof(struct list_head) * RCACHE_SIZE);
61         for (i = 0; i < RCACHE_SIZE; i++)
62                 INIT_LIST_HEAD(&cache[i]);
63
64         return cache;
65 }
66
67 static struct list_head *
68 izo_rep_hash(struct list_head *cache, char *uuid)
69 {
70         return &cache[(RCACHE_MASK & uuid[1])];
71 }
72
73 static void
74 izo_rep_cache_clean(struct presto_file_set *fset)
75 {
76         int i;
77         struct list_head *bucket;
78         struct list_head *tmp;
79
80         if (fset->fset_clients == NULL)
81                 return;
82         for (i = 0; i < RCACHE_SIZE; i++) {
83                 tmp = bucket = &fset->fset_clients[i];
84
85                 tmp = tmp->next;
86                 while (tmp != bucket) {
87                         struct izo_offset_rec *offrec;
88                         tmp = tmp->next;
89                         list_del(tmp);
90                         offrec = list_entry(tmp, struct izo_offset_rec,
91                                             or_list);
92                         PRESTO_FREE(offrec, sizeof(struct izo_offset_rec));
93                 }
94         }
95 }
96
97 struct izo_offset_rec *
98 izo_rep_cache_find(struct presto_file_set *fset, char *uuid)
99 {
100         struct list_head *tmp, *buck = izo_rep_hash(fset->fset_clients, uuid);
101         struct izo_offset_rec *rec = NULL;
102
103         list_for_each(tmp, buck) {
104                 rec = list_entry(tmp, struct izo_offset_rec, or_list);
105                 if ( memcmp(rec->or_uuid, uuid, sizeof(rec->or_uuid)) == 0 )
106                         return rec;
107         }
108
109         return NULL;
110 }
111
112 static int
113 izo_rep_cache_add(struct presto_file_set *fset, struct izo_rcvd_rec *rec,
114                   loff_t offset)
115 {
116         struct izo_offset_rec *offrec;
117
118         if (izo_rep_cache_find(fset, rec->lr_uuid)) {
119                 CERROR("izo: duplicate client entry %s off %Ld\n",
120                        fset->fset_name, offset);
121                 return -EINVAL;
122         }
123
124         PRESTO_ALLOC(offrec, sizeof(*offrec));
125         if (offrec == NULL) {
126                 CERROR("izo: cannot allocate offrec\n");
127                 return -ENOMEM;
128         }
129
130         memcpy(offrec->or_uuid, rec->lr_uuid, sizeof(rec->lr_uuid));
131         offrec->or_offset = offset;
132
133         list_add(&offrec->or_list,
134                  izo_rep_hash(fset->fset_clients, rec->lr_uuid));
135         return 0;
136 }
137
138 int
139 izo_rep_cache_init(struct presto_file_set *fset)
140 {
141         struct izo_rcvd_rec rec;
142         loff_t offset = 0, last_offset = 0;
143
144         fset->fset_clients = izo_rep_cache();
145         if (fset->fset_clients == NULL) {
146                 CERROR("Error initializing client cache\n");
147                 return -ENOMEM;
148         }
149
150         while ( presto_fread(fset->fset_rcvd.fd_file, (char *)&rec,
151                              sizeof(rec), &offset) == sizeof(rec) ) {
152                 int rc;
153
154                 if ((rc = izo_rep_cache_add(fset, &rec, last_offset)) < 0) {
155                         izo_rep_cache_clean(fset);
156                         return rc;
157                 }
158
159                 last_offset = offset;
160         }
161
162         return 0;
163 }
164
165 /*
166  * Return local last_rcvd record for the client. Update or create 
167  * if necessary.
168  *
169  * XXX: After this call, any -EINVAL from izo_rcvd_get is a real error.
170  */
171 int
172 izo_repstatus(struct presto_file_set *fset,  __u64 client_kmlsize, 
173               struct izo_rcvd_rec *lr_client, struct izo_rcvd_rec *lr_server)
174 {
175         int rc;
176         rc = izo_rcvd_get(lr_server, fset, lr_client->lr_uuid);
177         if (rc < 0 && rc != -EINVAL) {
178                 return rc;
179         }
180
181         /* client is new or has been reset. */
182         if (rc < 0 || (client_kmlsize == 0 && lr_client->lr_remote_offset == 0)) {
183                 memset(lr_server, 0, sizeof(*lr_server));
184                 memcpy(lr_server->lr_uuid, lr_client->lr_uuid, sizeof(lr_server->lr_uuid));
185                 rc = izo_rcvd_write(fset, lr_server);
186                 if (rc < 0)
187                         return rc;
188         }
189
190         /* update intersync */
191         rc = izo_upc_repstatus(presto_f2m(fset), fset->fset_name, lr_server);
192         return rc;
193 }
194
195 loff_t
196 izo_rcvd_get(struct izo_rcvd_rec *rec, struct presto_file_set *fset, char *uuid)
197 {
198         struct izo_offset_rec *offrec;
199         struct izo_rcvd_rec tmprec;
200         loff_t offset;
201
202         offrec = izo_rep_cache_find(fset, uuid);
203         if (offrec == NULL) {
204                 CDEBUG(D_SPECIAL, "izo_get_rcvd: uuid not in hash.\n");
205                 return -EINVAL;
206         }
207         offset = offrec->or_offset;
208
209         if (rec == NULL)
210                 return offset;
211
212         if (presto_fread(fset->fset_rcvd.fd_file, (char *)&tmprec,
213                          sizeof(tmprec), &offset) != sizeof(tmprec)) {
214                 CERROR("izo_get_rcvd: Unable to read from last_rcvd file offset "
215                        "%Lu\n", offset);
216                 return -EIO;
217         }
218
219         memcpy(rec->lr_uuid, tmprec.lr_uuid, sizeof(tmprec.lr_uuid));
220         rec->lr_remote_recno = le64_to_cpu(tmprec.lr_remote_recno);
221         rec->lr_remote_offset = le64_to_cpu(tmprec.lr_remote_offset);
222         rec->lr_local_recno = le64_to_cpu(tmprec.lr_local_recno);
223         rec->lr_local_offset = le64_to_cpu(tmprec.lr_local_offset);
224         rec->lr_last_ctime = le64_to_cpu(tmprec.lr_last_ctime);
225
226         return offrec->or_offset;
227 }
228
229 /* Try to lookup the UUID in the hash.  Insert it if it isn't found.  Write the
230  * data to the file.
231  *
232  * Returns the offset of the beginning of the record in the last_rcvd file. */
233 loff_t
234 izo_rcvd_write(struct presto_file_set *fset, struct izo_rcvd_rec *rec)
235 {
236         struct izo_offset_rec *offrec;
237         loff_t offset, rc;
238
239         ENTRY;
240
241         offrec = izo_rep_cache_find(fset, rec->lr_uuid);
242         if (offrec == NULL) {
243                 /* I don't think it should be possible for an entry to be not in
244                  * the hash table without also having an invalid offset, but we
245                  * handle it gracefully regardless. */
246                 write_lock(&fset->fset_rcvd.fd_lock);
247                 offset = fset->fset_rcvd.fd_offset;
248                 fset->fset_rcvd.fd_offset += sizeof(*rec);
249                 write_unlock(&fset->fset_rcvd.fd_lock);
250
251                 rc = izo_rep_cache_add(fset, rec, offset);
252                 if (rc < 0) {
253                         EXIT;
254                         return rc;
255                 }
256         } else
257                 offset = offrec->or_offset;
258         
259
260         rc = presto_fwrite(fset->fset_rcvd.fd_file, (char *)rec, sizeof(*rec),
261                            &offset);
262         if (rc == sizeof(*rec))
263                 /* presto_fwrite() advances 'offset' */
264                 rc = offset - sizeof(*rec);
265
266         EXIT;
267         return rc;
268 }
269
270 loff_t
271 izo_rcvd_upd_remote(struct presto_file_set *fset, char * uuid,  __u64 remote_recno, 
272                     __u64 remote_offset)
273 {
274         struct izo_rcvd_rec rec;
275         
276         loff_t rc;
277
278         ENTRY;
279         rc = izo_rcvd_get(&rec, fset, uuid);
280         if (rc < 0)
281                 return rc;
282         rec.lr_remote_recno = remote_recno;
283         rec.lr_remote_offset = remote_offset;
284
285         rc = izo_rcvd_write(fset, &rec);
286         EXIT;
287         if (rc < 0)
288                 return rc;
289         return 0;
290 }