X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=fs%2Fnfs%2Fdirect.c;h=68df803f27caaa5639c63dd2e707216c540f43d9;hb=6a77f38946aaee1cd85eeec6cf4229b204c15071;hp=21c3c4d396becd1c575bab082136813660d719f3;hpb=87fc8d1bb10cd459024a742c6a10961fefcef18f;p=linux-2.6.git diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c index 21c3c4d39..68df803f2 100644 --- a/fs/nfs/direct.c +++ b/fs/nfs/direct.c @@ -33,6 +33,7 @@ * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy * 08 Jun 2003 Port to 2.5 APIs --cel * 31 Mar 2004 Handle direct I/O without VFS support --cel + * 15 Sep 2004 Parallel async reads --cel * */ @@ -43,6 +44,7 @@ #include #include #include +#include #include #include @@ -50,11 +52,27 @@ #include #include +#include #define NFSDBG_FACILITY NFSDBG_VFS -#define VERF_SIZE (2 * sizeof(__u32)) #define MAX_DIRECTIO_SIZE (4096UL << PAGE_SHIFT) +static kmem_cache_t *nfs_direct_cachep; + +/* + * This represents a set of asynchronous requests that we're waiting on + */ +struct nfs_direct_req { + struct kref kref; /* release manager */ + struct list_head list; /* nfs_read_data structs */ + wait_queue_head_t wait; /* wait for i/o completion */ + struct page ** pages; /* pages in our buffer */ + unsigned int npages; /* count of pages */ + atomic_t complete, /* i/os we're waiting for */ + count, /* bytes actually processed */ + error; /* any reported error */ +}; + /** * nfs_get_user_pages - find and set up pages underlying user's buffer @@ -71,9 +89,12 @@ nfs_get_user_pages(int rw, unsigned long user_addr, size_t size, unsigned long page_count; size_t array_size; - /* set an arbitrary limit to prevent arithmetic overflow */ - if (size > MAX_DIRECTIO_SIZE) + /* set an arbitrary limit to prevent type overflow */ + /* XXX: this can probably be as large as INT_MAX */ + if (size > MAX_DIRECTIO_SIZE) { + *pages = NULL; return -EFBIG; + } page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT; page_count -= user_addr >> PAGE_SHIFT; @@ -93,6 +114,8 @@ nfs_get_user_pages(int rw, unsigned long user_addr, size_t size, /** * nfs_free_user_pages - tear down page struct array * @pages: array of page struct pointers underlying target buffer + * @npages: number of pages in the array + * @do_dirty: dirty the pages as we release them */ static void nfs_free_user_pages(struct page **pages, int npages, int do_dirty) @@ -107,77 +130,231 @@ nfs_free_user_pages(struct page **pages, int npages, int do_dirty) } /** - * nfs_direct_read_seg - Read in one iov segment. Generate separate - * read RPCs for each "rsize" bytes. - * @inode: target inode - * @ctx: target file open context - * user_addr: starting address of this segment of user's buffer - * count: size of this segment - * file_offset: offset in file to begin the operation - * @pages: array of addresses of page structs defining user's buffer - * nr_pages: size of pages array + * nfs_direct_req_release - release nfs_direct_req structure for direct read + * @kref: kref object embedded in an nfs_direct_req structure + * */ -static int -nfs_direct_read_seg(struct inode *inode, struct nfs_open_context *ctx, - unsigned long user_addr, size_t count, loff_t file_offset, - struct page **pages, int nr_pages) +static void nfs_direct_req_release(struct kref *kref) { - const unsigned int rsize = NFS_SERVER(inode)->rsize; - int tot_bytes = 0; - int curpage = 0; - struct nfs_read_data rdata = { - .inode = inode, - .cred = ctx->cred, - .args = { - .fh = NFS_FH(inode), - .context = ctx, - }, - .res = { - .fattr = &rdata.fattr, - }, - }; + struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref); + kmem_cache_free(nfs_direct_cachep, dreq); +} - rdata.args.pgbase = user_addr & ~PAGE_MASK; - rdata.args.offset = file_offset; - do { - int result; +/** + * nfs_direct_read_alloc - allocate nfs_read_data structures for direct read + * @count: count of bytes for the read request + * @rsize: local rsize setting + * + * Note we also set the number of requests we have in the dreq when we are + * done. This prevents races with I/O completion so we will always wait + * until all requests have been dispatched and completed. + */ +static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, unsigned int rsize) +{ + struct list_head *list; + struct nfs_direct_req *dreq; + unsigned int reads = 0; + + dreq = kmem_cache_alloc(nfs_direct_cachep, SLAB_KERNEL); + if (!dreq) + return NULL; + + kref_init(&dreq->kref); + init_waitqueue_head(&dreq->wait); + INIT_LIST_HEAD(&dreq->list); + atomic_set(&dreq->count, 0); + atomic_set(&dreq->error, 0); + + list = &dreq->list; + for(;;) { + struct nfs_read_data *data = nfs_readdata_alloc(); + + if (unlikely(!data)) { + while (!list_empty(list)) { + data = list_entry(list->next, + struct nfs_read_data, pages); + list_del(&data->pages); + nfs_readdata_free(data); + } + kref_put(&dreq->kref, nfs_direct_req_release); + return NULL; + } - rdata.args.count = count; - if (rdata.args.count > rsize) - rdata.args.count = rsize; - rdata.args.pages = &pages[curpage]; + INIT_LIST_HEAD(&data->pages); + list_add(&data->pages, list); - dprintk("NFS: direct read: c=%u o=%Ld ua=%lu, pb=%u, cp=%u\n", - rdata.args.count, (long long) rdata.args.offset, - user_addr + tot_bytes, rdata.args.pgbase, curpage); + data->req = (struct nfs_page *) dreq; + reads++; + if (nbytes <= rsize) + break; + nbytes -= rsize; + } + kref_get(&dreq->kref); + atomic_set(&dreq->complete, reads); + return dreq; +} + +/** + * nfs_direct_read_result - handle a read reply for a direct read request + * @data: address of NFS READ operation control block + * @status: status of this NFS READ operation + * + * We must hold a reference to all the pages in this direct read request + * until the RPCs complete. This could be long *after* we are woken up in + * nfs_direct_read_wait (for instance, if someone hits ^C on a slow server). + */ +static void nfs_direct_read_result(struct nfs_read_data *data, int status) +{ + struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req; + + if (likely(status >= 0)) + atomic_add(data->res.count, &dreq->count); + else + atomic_set(&dreq->error, status); + + if (unlikely(atomic_dec_and_test(&dreq->complete))) { + nfs_free_user_pages(dreq->pages, dreq->npages, 1); + wake_up(&dreq->wait); + kref_put(&dreq->kref, nfs_direct_req_release); + } +} + +/** + * nfs_direct_read_schedule - dispatch NFS READ operations for a direct read + * @dreq: address of nfs_direct_req struct for this request + * @inode: target inode + * @ctx: target file open context + * @user_addr: starting address of this segment of user's buffer + * @count: size of this segment + * @file_offset: offset in file to begin the operation + * + * For each nfs_read_data struct that was allocated on the list, dispatch + * an NFS READ operation + */ +static void nfs_direct_read_schedule(struct nfs_direct_req *dreq, + struct inode *inode, struct nfs_open_context *ctx, + unsigned long user_addr, size_t count, loff_t file_offset) +{ + struct list_head *list = &dreq->list; + struct page **pages = dreq->pages; + unsigned int curpage, pgbase; + unsigned int rsize = NFS_SERVER(inode)->rsize; + + curpage = 0; + pgbase = user_addr & ~PAGE_MASK; + do { + struct nfs_read_data *data; + unsigned int bytes; + + bytes = rsize; + if (count < rsize) + bytes = count; + + data = list_entry(list->next, struct nfs_read_data, pages); + list_del_init(&data->pages); + + data->inode = inode; + data->cred = ctx->cred; + data->args.fh = NFS_FH(inode); + data->args.context = ctx; + data->args.offset = file_offset; + data->args.pgbase = pgbase; + data->args.pages = &pages[curpage]; + data->args.count = bytes; + data->res.fattr = &data->fattr; + data->res.eof = 0; + data->res.count = bytes; + + NFS_PROTO(inode)->read_setup(data); + + data->task.tk_cookie = (unsigned long) inode; + data->task.tk_calldata = data; + data->task.tk_release = nfs_readdata_release; + data->complete = nfs_direct_read_result; lock_kernel(); - result = NFS_PROTO(inode)->read(&rdata); + rpc_execute(&data->task); unlock_kernel(); - if (result <= 0) { - if (tot_bytes > 0) - break; - if (result == -EISDIR) - result = -EINVAL; - return result; - } + dfprintk(VFS, "NFS: %4d initiated direct read call (req %s/%Ld, %u bytes @ offset %Lu)\n", + data->task.tk_pid, + inode->i_sb->s_id, + (long long)NFS_FILEID(inode), + bytes, + (unsigned long long)data->args.offset); - tot_bytes += result; - if (rdata.res.eof) - break; + file_offset += bytes; + pgbase += bytes; + curpage += pgbase >> PAGE_SHIFT; + pgbase &= ~PAGE_MASK; - rdata.args.offset += result; - rdata.args.pgbase += result; - curpage += rdata.args.pgbase >> PAGE_SHIFT; - rdata.args.pgbase &= ~PAGE_MASK; - count -= result; + count -= bytes; } while (count != 0); +} + +/** + * nfs_direct_read_wait - wait for I/O completion for direct reads + * @dreq: request on which we are to wait + * @intr: whether or not this wait can be interrupted + * + * Collects and returns the final error value/byte-count. + */ +static ssize_t nfs_direct_read_wait(struct nfs_direct_req *dreq, int intr) +{ + int result = 0; + + if (intr) { + result = wait_event_interruptible(dreq->wait, + (atomic_read(&dreq->complete) == 0)); + } else { + wait_event(dreq->wait, (atomic_read(&dreq->complete) == 0)); + } - /* XXX: should we zero the rest of the user's buffer if we - * hit eof? */ + if (!result) + result = atomic_read(&dreq->error); + if (!result) + result = atomic_read(&dreq->count); - return tot_bytes; + kref_put(&dreq->kref, nfs_direct_req_release); + return (ssize_t) result; +} + +/** + * nfs_direct_read_seg - Read in one iov segment. Generate separate + * read RPCs for each "rsize" bytes. + * @inode: target inode + * @ctx: target file open context + * @user_addr: starting address of this segment of user's buffer + * @count: size of this segment + * @file_offset: offset in file to begin the operation + * @pages: array of addresses of page structs defining user's buffer + * @nr_pages: number of pages in the array + * + */ +static ssize_t nfs_direct_read_seg(struct inode *inode, + struct nfs_open_context *ctx, unsigned long user_addr, + size_t count, loff_t file_offset, struct page **pages, + unsigned int nr_pages) +{ + ssize_t result; + sigset_t oldset; + struct rpc_clnt *clnt = NFS_CLIENT(inode); + struct nfs_direct_req *dreq; + + dreq = nfs_direct_read_alloc(count, NFS_SERVER(inode)->rsize); + if (!dreq) + return -ENOMEM; + + dreq->pages = pages; + dreq->npages = nr_pages; + + rpc_clnt_sigmask(clnt, &oldset); + nfs_direct_read_schedule(dreq, inode, ctx, user_addr, count, + file_offset); + result = nfs_direct_read_wait(dreq, clnt->cl_intr); + rpc_clnt_sigunmask(clnt, &oldset); + + return result; } /** @@ -189,9 +366,8 @@ nfs_direct_read_seg(struct inode *inode, struct nfs_open_context *ctx, * file_offset: offset in file to begin the operation * nr_segs: size of iovec array * - * generic_file_direct_IO has already pushed out any non-direct - * writes so that this read will see them when we read from the - * server. + * We've already pushed out any non-direct writes so that this read + * will see them when we read from the server. */ static ssize_t nfs_direct_read(struct inode *inode, struct nfs_open_context *ctx, @@ -220,8 +396,6 @@ nfs_direct_read(struct inode *inode, struct nfs_open_context *ctx, result = nfs_direct_read_seg(inode, ctx, user_addr, size, file_offset, pages, page_count); - nfs_free_user_pages(pages, page_count, 1); - if (result <= 0) { if (tot_bytes > 0) break; @@ -247,31 +421,31 @@ nfs_direct_read(struct inode *inode, struct nfs_open_context *ctx, * @pages: array of addresses of page structs defining user's buffer * nr_pages: size of pages array */ -static int -nfs_direct_write_seg(struct inode *inode, struct nfs_open_context *ctx, - unsigned long user_addr, size_t count, loff_t file_offset, - struct page **pages, int nr_pages) +static ssize_t nfs_direct_write_seg(struct inode *inode, + struct nfs_open_context *ctx, unsigned long user_addr, + size_t count, loff_t file_offset, struct page **pages, + int nr_pages) { const unsigned int wsize = NFS_SERVER(inode)->wsize; size_t request; - int curpage, need_commit, result, tot_bytes; + int curpage, need_commit; + ssize_t result, tot_bytes; struct nfs_writeverf first_verf; - struct nfs_write_data wdata = { - .inode = inode, - .cred = ctx->cred, - .args = { - .fh = NFS_FH(inode), - .context = ctx, - }, - .res = { - .fattr = &wdata.fattr, - .verf = &wdata.verf, - }, - }; + struct nfs_write_data *wdata; - wdata.args.stable = NFS_UNSTABLE; + wdata = nfs_writedata_alloc(); + if (!wdata) + return -ENOMEM; + + wdata->inode = inode; + wdata->cred = ctx->cred; + wdata->args.fh = NFS_FH(inode); + wdata->args.context = ctx; + wdata->args.stable = NFS_UNSTABLE; if (IS_SYNC(inode) || NFS_PROTO(inode)->version == 2 || count <= wsize) - wdata.args.stable = NFS_FILE_SYNC; + wdata->args.stable = NFS_FILE_SYNC; + wdata->res.fattr = &wdata->fattr; + wdata->res.verf = &wdata->verf; nfs_begin_data_update(inode); retry: @@ -279,20 +453,20 @@ retry: tot_bytes = 0; curpage = 0; request = count; - wdata.args.pgbase = user_addr & ~PAGE_MASK; - wdata.args.offset = file_offset; - do { - wdata.args.count = request; - if (wdata.args.count > wsize) - wdata.args.count = wsize; - wdata.args.pages = &pages[curpage]; + wdata->args.pgbase = user_addr & ~PAGE_MASK; + wdata->args.offset = file_offset; + do { + wdata->args.count = request; + if (wdata->args.count > wsize) + wdata->args.count = wsize; + wdata->args.pages = &pages[curpage]; dprintk("NFS: direct write: c=%u o=%Ld ua=%lu, pb=%u, cp=%u\n", - wdata.args.count, (long long) wdata.args.offset, - user_addr + tot_bytes, wdata.args.pgbase, curpage); + wdata->args.count, (long long) wdata->args.offset, + user_addr + tot_bytes, wdata->args.pgbase, curpage); lock_kernel(); - result = NFS_PROTO(inode)->write(&wdata); + result = NFS_PROTO(inode)->write(wdata); unlock_kernel(); if (result <= 0) { @@ -302,20 +476,25 @@ retry: } if (tot_bytes == 0) - memcpy(&first_verf.verifier, &wdata.verf.verifier, - VERF_SIZE); - if (wdata.verf.committed != NFS_FILE_SYNC) { + memcpy(&first_verf.verifier, &wdata->verf.verifier, + sizeof(first_verf.verifier)); + if (wdata->verf.committed != NFS_FILE_SYNC) { need_commit = 1; - if (memcmp(&first_verf.verifier, - &wdata.verf.verifier, VERF_SIZE)) + if (memcmp(&first_verf.verifier, &wdata->verf.verifier, + sizeof(first_verf.verifier))); goto sync_retry; } - tot_bytes += result; - wdata.args.offset += result; - wdata.args.pgbase += result; - curpage += wdata.args.pgbase >> PAGE_SHIFT; - wdata.args.pgbase &= ~PAGE_MASK; + tot_bytes += result; + + /* in case of a short write: stop now, let the app recover */ + if (result < wdata->args.count) + break; + + wdata->args.offset += result; + wdata->args.pgbase += result; + curpage += wdata->args.pgbase >> PAGE_SHIFT; + wdata->args.pgbase &= ~PAGE_MASK; request -= result; } while (request != 0); @@ -323,27 +502,27 @@ retry: * Commit data written so far, even in the event of an error */ if (need_commit) { - wdata.args.count = tot_bytes; - wdata.args.offset = file_offset; + wdata->args.count = tot_bytes; + wdata->args.offset = file_offset; lock_kernel(); - result = NFS_PROTO(inode)->commit(&wdata); + result = NFS_PROTO(inode)->commit(wdata); unlock_kernel(); if (result < 0 || memcmp(&first_verf.verifier, - &wdata.verf.verifier, - VERF_SIZE) != 0) + &wdata->verf.verifier, + sizeof(first_verf.verifier)) != 0) goto sync_retry; } result = tot_bytes; out: nfs_end_data_update_defer(inode); - + nfs_writedata_free(wdata); return result; sync_retry: - wdata.args.stable = NFS_FILE_SYNC; + wdata->args.stable = NFS_FILE_SYNC; goto retry; } @@ -360,9 +539,9 @@ sync_retry: * that non-direct readers might access, so they will pick up these * writes immediately. */ -static int nfs_direct_write(struct inode *inode, struct nfs_open_context *ctx, - const struct iovec *iov, loff_t file_offset, - unsigned long nr_segs) +static ssize_t nfs_direct_write(struct inode *inode, + struct nfs_open_context *ctx, const struct iovec *iov, + loff_t file_offset, unsigned long nr_segs) { ssize_t tot_bytes = 0; unsigned long seg = 0; @@ -501,6 +680,8 @@ nfs_file_direct_read(struct kiocb *iocb, char __user *buf, size_t count, loff_t if (mapping->nrpages) { retval = filemap_fdatawrite(mapping); + if (retval == 0) + retval = nfs_wb_all(inode); if (retval == 0) retval = filemap_fdatawait(mapping); if (retval) @@ -545,7 +726,7 @@ nfs_file_direct_write(struct kiocb *iocb, const char __user *buf, size_t count, { ssize_t retval = -EINVAL; loff_t *ppos = &iocb->ki_pos; - unsigned long limit = current->rlim[RLIMIT_FSIZE].rlim_cur; + unsigned long limit = current->signal->rlim[RLIMIT_FSIZE].rlim_cur; struct file *file = iocb->ki_filp; struct nfs_open_context *ctx = (struct nfs_open_context *) file->private_data; @@ -590,6 +771,8 @@ nfs_file_direct_write(struct kiocb *iocb, const char __user *buf, size_t count, if (mapping->nrpages) { retval = filemap_fdatawrite(mapping); + if (retval == 0) + retval = nfs_wb_all(inode); if (retval == 0) retval = filemap_fdatawait(mapping); if (retval) @@ -605,3 +788,21 @@ nfs_file_direct_write(struct kiocb *iocb, const char __user *buf, size_t count, out: return retval; } + +int nfs_init_directcache(void) +{ + nfs_direct_cachep = kmem_cache_create("nfs_direct_cache", + sizeof(struct nfs_direct_req), + 0, SLAB_RECLAIM_ACCOUNT, + NULL, NULL); + if (nfs_direct_cachep == NULL) + return -ENOMEM; + + return 0; +} + +void nfs_destroy_directcache(void) +{ + if (kmem_cache_destroy(nfs_direct_cachep)) + printk(KERN_INFO "nfs_direct_cache: not all structures were freed\n"); +}