vserver 1.9.5.x5
[linux-2.6.git] / fs / nfs / direct.c
index 21c3c4d..68df803 100644 (file)
@@ -33,6 +33,7 @@
  * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy
  * 08 Jun 2003 Port to 2.5 APIs  --cel
  * 31 Mar 2004 Handle direct I/O without VFS support  --cel
+ * 15 Sep 2004 Parallel async reads  --cel
  *
  */
 
@@ -43,6 +44,7 @@
 #include <linux/smp_lock.h>
 #include <linux/file.h>
 #include <linux/pagemap.h>
+#include <linux/kref.h>
 
 #include <linux/nfs_fs.h>
 #include <linux/nfs_page.h>
 
 #include <asm/system.h>
 #include <asm/uaccess.h>
+#include <asm/atomic.h>
 
 #define NFSDBG_FACILITY                NFSDBG_VFS
-#define VERF_SIZE              (2 * sizeof(__u32))
 #define MAX_DIRECTIO_SIZE      (4096UL << PAGE_SHIFT)
 
+static kmem_cache_t *nfs_direct_cachep;
+
+/*
+ * This represents a set of asynchronous requests that we're waiting on
+ */
+struct nfs_direct_req {
+       struct kref             kref;           /* release manager */
+       struct list_head        list;           /* nfs_read_data structs */
+       wait_queue_head_t       wait;           /* wait for i/o completion */
+       struct page **          pages;          /* pages in our buffer */
+       unsigned int            npages;         /* count of pages */
+       atomic_t                complete,       /* i/os we're waiting for */
+                               count,          /* bytes actually processed */
+                               error;          /* any reported error */
+};
+
 
 /**
  * nfs_get_user_pages - find and set up pages underlying user's buffer
@@ -71,9 +89,12 @@ nfs_get_user_pages(int rw, unsigned long user_addr, size_t size,
        unsigned long page_count;
        size_t array_size;
 
-       /* set an arbitrary limit to prevent arithmetic overflow */
-       if (size > MAX_DIRECTIO_SIZE)
+       /* set an arbitrary limit to prevent type overflow */
+       /* XXX: this can probably be as large as INT_MAX */
+       if (size > MAX_DIRECTIO_SIZE) {
+               *pages = NULL;
                return -EFBIG;
+       }
 
        page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
        page_count -= user_addr >> PAGE_SHIFT;
@@ -93,6 +114,8 @@ nfs_get_user_pages(int rw, unsigned long user_addr, size_t size,
 /**
  * nfs_free_user_pages - tear down page struct array
  * @pages: array of page struct pointers underlying target buffer
+ * @npages: number of pages in the array
+ * @do_dirty: dirty the pages as we release them
  */
 static void
 nfs_free_user_pages(struct page **pages, int npages, int do_dirty)
@@ -107,77 +130,231 @@ nfs_free_user_pages(struct page **pages, int npages, int do_dirty)
 }
 
 /**
- * nfs_direct_read_seg - Read in one iov segment.  Generate separate
- *                        read RPCs for each "rsize" bytes.
- * @inode: target inode
- * @ctx: target file open context
- * user_addr: starting address of this segment of user's buffer
- * count: size of this segment
- * file_offset: offset in file to begin the operation
- * @pages: array of addresses of page structs defining user's buffer
- * nr_pages: size of pages array
+ * nfs_direct_req_release - release  nfs_direct_req structure for direct read
+ * @kref: kref object embedded in an nfs_direct_req structure
+ *
  */
-static int
-nfs_direct_read_seg(struct inode *inode, struct nfs_open_context *ctx,
-               unsigned long user_addr, size_t count, loff_t file_offset,
-               struct page **pages, int nr_pages)
+static void nfs_direct_req_release(struct kref *kref)
 {
-       const unsigned int rsize = NFS_SERVER(inode)->rsize;
-       int tot_bytes = 0;
-       int curpage = 0;
-       struct nfs_read_data    rdata = {
-               .inode          = inode,
-               .cred           = ctx->cred,
-               .args           = {
-                       .fh             = NFS_FH(inode),
-                       .context        = ctx,
-               },
-               .res            = {
-                       .fattr          = &rdata.fattr,
-               },
-       };
+       struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
+       kmem_cache_free(nfs_direct_cachep, dreq);
+}
 
-       rdata.args.pgbase = user_addr & ~PAGE_MASK;
-       rdata.args.offset = file_offset;
-        do {
-               int result;
+/**
+ * nfs_direct_read_alloc - allocate nfs_read_data structures for direct read
+ * @count: count of bytes for the read request
+ * @rsize: local rsize setting
+ *
+ * Note we also set the number of requests we have in the dreq when we are
+ * done.  This prevents races with I/O completion so we will always wait
+ * until all requests have been dispatched and completed.
+ */
+static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, unsigned int rsize)
+{
+       struct list_head *list;
+       struct nfs_direct_req *dreq;
+       unsigned int reads = 0;
+
+       dreq = kmem_cache_alloc(nfs_direct_cachep, SLAB_KERNEL);
+       if (!dreq)
+               return NULL;
+
+       kref_init(&dreq->kref);
+       init_waitqueue_head(&dreq->wait);
+       INIT_LIST_HEAD(&dreq->list);
+       atomic_set(&dreq->count, 0);
+       atomic_set(&dreq->error, 0);
+
+       list = &dreq->list;
+       for(;;) {
+               struct nfs_read_data *data = nfs_readdata_alloc();
+
+               if (unlikely(!data)) {
+                       while (!list_empty(list)) {
+                               data = list_entry(list->next,
+                                                 struct nfs_read_data, pages);
+                               list_del(&data->pages);
+                               nfs_readdata_free(data);
+                       }
+                       kref_put(&dreq->kref, nfs_direct_req_release);
+                       return NULL;
+               }
 
-               rdata.args.count = count;
-                if (rdata.args.count > rsize)
-                        rdata.args.count = rsize;
-               rdata.args.pages = &pages[curpage];
+               INIT_LIST_HEAD(&data->pages);
+               list_add(&data->pages, list);
 
-               dprintk("NFS: direct read: c=%u o=%Ld ua=%lu, pb=%u, cp=%u\n",
-                       rdata.args.count, (long long) rdata.args.offset,
-                       user_addr + tot_bytes, rdata.args.pgbase, curpage);
+               data->req = (struct nfs_page *) dreq;
+               reads++;
+               if (nbytes <= rsize)
+                       break;
+               nbytes -= rsize;
+       }
+       kref_get(&dreq->kref);
+       atomic_set(&dreq->complete, reads);
+       return dreq;
+}
+
+/**
+ * nfs_direct_read_result - handle a read reply for a direct read request
+ * @data: address of NFS READ operation control block
+ * @status: status of this NFS READ operation
+ *
+ * We must hold a reference to all the pages in this direct read request
+ * until the RPCs complete.  This could be long *after* we are woken up in
+ * nfs_direct_read_wait (for instance, if someone hits ^C on a slow server).
+ */
+static void nfs_direct_read_result(struct nfs_read_data *data, int status)
+{
+       struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
+
+       if (likely(status >= 0))
+               atomic_add(data->res.count, &dreq->count);
+       else
+               atomic_set(&dreq->error, status);
+
+       if (unlikely(atomic_dec_and_test(&dreq->complete))) {
+               nfs_free_user_pages(dreq->pages, dreq->npages, 1);
+               wake_up(&dreq->wait);
+               kref_put(&dreq->kref, nfs_direct_req_release);
+       }
+}
+
+/**
+ * nfs_direct_read_schedule - dispatch NFS READ operations for a direct read
+ * @dreq: address of nfs_direct_req struct for this request
+ * @inode: target inode
+ * @ctx: target file open context
+ * @user_addr: starting address of this segment of user's buffer
+ * @count: size of this segment
+ * @file_offset: offset in file to begin the operation
+ *
+ * For each nfs_read_data struct that was allocated on the list, dispatch
+ * an NFS READ operation
+ */
+static void nfs_direct_read_schedule(struct nfs_direct_req *dreq,
+               struct inode *inode, struct nfs_open_context *ctx,
+               unsigned long user_addr, size_t count, loff_t file_offset)
+{
+       struct list_head *list = &dreq->list;
+       struct page **pages = dreq->pages;
+       unsigned int curpage, pgbase;
+       unsigned int rsize = NFS_SERVER(inode)->rsize;
+
+       curpage = 0;
+       pgbase = user_addr & ~PAGE_MASK;
+       do {
+               struct nfs_read_data *data;
+               unsigned int bytes;
+
+               bytes = rsize;
+               if (count < rsize)
+                       bytes = count;
+
+               data = list_entry(list->next, struct nfs_read_data, pages);
+               list_del_init(&data->pages);
+
+               data->inode = inode;
+               data->cred = ctx->cred;
+               data->args.fh = NFS_FH(inode);
+               data->args.context = ctx;
+               data->args.offset = file_offset;
+               data->args.pgbase = pgbase;
+               data->args.pages = &pages[curpage];
+               data->args.count = bytes;
+               data->res.fattr = &data->fattr;
+               data->res.eof = 0;
+               data->res.count = bytes;
+
+               NFS_PROTO(inode)->read_setup(data);
+
+               data->task.tk_cookie = (unsigned long) inode;
+               data->task.tk_calldata = data;
+               data->task.tk_release = nfs_readdata_release;
+               data->complete = nfs_direct_read_result;
 
                lock_kernel();
-               result = NFS_PROTO(inode)->read(&rdata);
+               rpc_execute(&data->task);
                unlock_kernel();
 
-               if (result <= 0) {
-                       if (tot_bytes > 0)
-                               break;
-                       if (result == -EISDIR)
-                               result = -EINVAL;
-                       return result;
-               }
+               dfprintk(VFS, "NFS: %4d initiated direct read call (req %s/%Ld, %u bytes @ offset %Lu)\n",
+                               data->task.tk_pid,
+                               inode->i_sb->s_id,
+                               (long long)NFS_FILEID(inode),
+                               bytes,
+                               (unsigned long long)data->args.offset);
 
-                tot_bytes += result;
-               if (rdata.res.eof)
-                       break;
+               file_offset += bytes;
+               pgbase += bytes;
+               curpage += pgbase >> PAGE_SHIFT;
+               pgbase &= ~PAGE_MASK;
 
-                rdata.args.offset += result;
-               rdata.args.pgbase += result;
-               curpage += rdata.args.pgbase >> PAGE_SHIFT;
-               rdata.args.pgbase &= ~PAGE_MASK;
-               count -= result;
+               count -= bytes;
        } while (count != 0);
+}
+
+/**
+ * nfs_direct_read_wait - wait for I/O completion for direct reads
+ * @dreq: request on which we are to wait
+ * @intr: whether or not this wait can be interrupted
+ *
+ * Collects and returns the final error value/byte-count.
+ */
+static ssize_t nfs_direct_read_wait(struct nfs_direct_req *dreq, int intr)
+{
+       int result = 0;
+
+       if (intr) {
+               result = wait_event_interruptible(dreq->wait,
+                                       (atomic_read(&dreq->complete) == 0));
+       } else {
+               wait_event(dreq->wait, (atomic_read(&dreq->complete) == 0));
+       }
 
-       /* XXX: should we zero the rest of the user's buffer if we
-        *      hit eof? */
+       if (!result)
+               result = atomic_read(&dreq->error);
+       if (!result)
+               result = atomic_read(&dreq->count);
 
-       return tot_bytes;
+       kref_put(&dreq->kref, nfs_direct_req_release);
+       return (ssize_t) result;
+}
+
+/**
+ * nfs_direct_read_seg - Read in one iov segment.  Generate separate
+ *                        read RPCs for each "rsize" bytes.
+ * @inode: target inode
+ * @ctx: target file open context
+ * @user_addr: starting address of this segment of user's buffer
+ * @count: size of this segment
+ * @file_offset: offset in file to begin the operation
+ * @pages: array of addresses of page structs defining user's buffer
+ * @nr_pages: number of pages in the array
+ *
+ */
+static ssize_t nfs_direct_read_seg(struct inode *inode,
+               struct nfs_open_context *ctx, unsigned long user_addr,
+               size_t count, loff_t file_offset, struct page **pages,
+               unsigned int nr_pages)
+{
+       ssize_t result;
+       sigset_t oldset;
+       struct rpc_clnt *clnt = NFS_CLIENT(inode);
+       struct nfs_direct_req *dreq;
+
+       dreq = nfs_direct_read_alloc(count, NFS_SERVER(inode)->rsize);
+       if (!dreq)
+               return -ENOMEM;
+
+       dreq->pages = pages;
+       dreq->npages = nr_pages;
+
+       rpc_clnt_sigmask(clnt, &oldset);
+       nfs_direct_read_schedule(dreq, inode, ctx, user_addr, count,
+                                file_offset);
+       result = nfs_direct_read_wait(dreq, clnt->cl_intr);
+       rpc_clnt_sigunmask(clnt, &oldset);
+
+       return result;
 }
 
 /**
@@ -189,9 +366,8 @@ nfs_direct_read_seg(struct inode *inode, struct nfs_open_context *ctx,
  * file_offset: offset in file to begin the operation
  * nr_segs: size of iovec array
  *
- * generic_file_direct_IO has already pushed out any non-direct
- * writes so that this read will see them when we read from the
- * server.
+ * We've already pushed out any non-direct writes so that this read
+ * will see them when we read from the server.
  */
 static ssize_t
 nfs_direct_read(struct inode *inode, struct nfs_open_context *ctx,
@@ -220,8 +396,6 @@ nfs_direct_read(struct inode *inode, struct nfs_open_context *ctx,
                result = nfs_direct_read_seg(inode, ctx, user_addr, size,
                                file_offset, pages, page_count);
 
-               nfs_free_user_pages(pages, page_count, 1);
-
                if (result <= 0) {
                        if (tot_bytes > 0)
                                break;
@@ -247,31 +421,31 @@ nfs_direct_read(struct inode *inode, struct nfs_open_context *ctx,
  * @pages: array of addresses of page structs defining user's buffer
  * nr_pages: size of pages array
  */
-static int
-nfs_direct_write_seg(struct inode *inode, struct nfs_open_context *ctx,
-               unsigned long user_addr, size_t count, loff_t file_offset,
-               struct page **pages, int nr_pages)
+static ssize_t nfs_direct_write_seg(struct inode *inode,
+               struct nfs_open_context *ctx, unsigned long user_addr,
+               size_t count, loff_t file_offset, struct page **pages,
+               int nr_pages)
 {
        const unsigned int wsize = NFS_SERVER(inode)->wsize;
        size_t request;
-       int curpage, need_commit, result, tot_bytes;
+       int curpage, need_commit;
+       ssize_t result, tot_bytes;
        struct nfs_writeverf first_verf;
-       struct nfs_write_data   wdata = {
-               .inode          = inode,
-               .cred           = ctx->cred,
-               .args           = {
-                       .fh             = NFS_FH(inode),
-                       .context        = ctx,
-               },
-               .res            = {
-                       .fattr          = &wdata.fattr,
-                       .verf           = &wdata.verf,
-               },
-       };
+       struct nfs_write_data *wdata;
 
-       wdata.args.stable = NFS_UNSTABLE;
+       wdata = nfs_writedata_alloc();
+       if (!wdata)
+               return -ENOMEM;
+
+       wdata->inode = inode;
+       wdata->cred = ctx->cred;
+       wdata->args.fh = NFS_FH(inode);
+       wdata->args.context = ctx;
+       wdata->args.stable = NFS_UNSTABLE;
        if (IS_SYNC(inode) || NFS_PROTO(inode)->version == 2 || count <= wsize)
-               wdata.args.stable = NFS_FILE_SYNC;
+               wdata->args.stable = NFS_FILE_SYNC;
+       wdata->res.fattr = &wdata->fattr;
+       wdata->res.verf = &wdata->verf;
 
        nfs_begin_data_update(inode);
 retry:
@@ -279,20 +453,20 @@ retry:
        tot_bytes = 0;
        curpage = 0;
        request = count;
-       wdata.args.pgbase = user_addr & ~PAGE_MASK;
-       wdata.args.offset = file_offset;
-        do {
-               wdata.args.count = request;
-                if (wdata.args.count > wsize)
-                        wdata.args.count = wsize;
-               wdata.args.pages = &pages[curpage];
+       wdata->args.pgbase = user_addr & ~PAGE_MASK;
+       wdata->args.offset = file_offset;
+       do {
+               wdata->args.count = request;
+               if (wdata->args.count > wsize)
+                       wdata->args.count = wsize;
+               wdata->args.pages = &pages[curpage];
 
                dprintk("NFS: direct write: c=%u o=%Ld ua=%lu, pb=%u, cp=%u\n",
-                       wdata.args.count, (long long) wdata.args.offset,
-                       user_addr + tot_bytes, wdata.args.pgbase, curpage);
+                       wdata->args.count, (long long) wdata->args.offset,
+                       user_addr + tot_bytes, wdata->args.pgbase, curpage);
 
                lock_kernel();
-               result = NFS_PROTO(inode)->write(&wdata);
+               result = NFS_PROTO(inode)->write(wdata);
                unlock_kernel();
 
                if (result <= 0) {
@@ -302,20 +476,25 @@ retry:
                }
 
                if (tot_bytes == 0)
-                       memcpy(&first_verf.verifier, &wdata.verf.verifier,
-                                                               VERF_SIZE);
-               if (wdata.verf.committed != NFS_FILE_SYNC) {
+                       memcpy(&first_verf.verifier, &wdata->verf.verifier,
+                                               sizeof(first_verf.verifier));
+               if (wdata->verf.committed != NFS_FILE_SYNC) {
                        need_commit = 1;
-                       if (memcmp(&first_verf.verifier,
-                                       &wdata.verf.verifier, VERF_SIZE))
+                       if (memcmp(&first_verf.verifier, &wdata->verf.verifier,
+                                       sizeof(first_verf.verifier)));
                                goto sync_retry;
                }
 
-                tot_bytes += result;
-                wdata.args.offset += result;
-               wdata.args.pgbase += result;
-               curpage += wdata.args.pgbase >> PAGE_SHIFT;
-               wdata.args.pgbase &= ~PAGE_MASK;
+               tot_bytes += result;
+
+               /* in case of a short write: stop now, let the app recover */
+               if (result < wdata->args.count)
+                       break;
+
+               wdata->args.offset += result;
+               wdata->args.pgbase += result;
+               curpage += wdata->args.pgbase >> PAGE_SHIFT;
+               wdata->args.pgbase &= ~PAGE_MASK;
                request -= result;
        } while (request != 0);
 
@@ -323,27 +502,27 @@ retry:
         * Commit data written so far, even in the event of an error
         */
        if (need_commit) {
-               wdata.args.count = tot_bytes;
-               wdata.args.offset = file_offset;
+               wdata->args.count = tot_bytes;
+               wdata->args.offset = file_offset;
 
                lock_kernel();
-               result = NFS_PROTO(inode)->commit(&wdata);
+               result = NFS_PROTO(inode)->commit(wdata);
                unlock_kernel();
 
                if (result < 0 || memcmp(&first_verf.verifier,
-                                               &wdata.verf.verifier,
-                                               VERF_SIZE) != 0)
+                                        &wdata->verf.verifier,
+                                        sizeof(first_verf.verifier)) != 0)
                        goto sync_retry;
        }
        result = tot_bytes;
 
 out:
        nfs_end_data_update_defer(inode);
-
+       nfs_writedata_free(wdata);
        return result;
 
 sync_retry:
-       wdata.args.stable = NFS_FILE_SYNC;
+       wdata->args.stable = NFS_FILE_SYNC;
        goto retry;
 }
 
@@ -360,9 +539,9 @@ sync_retry:
  * that non-direct readers might access, so they will pick up these
  * writes immediately.
  */
-static int nfs_direct_write(struct inode *inode, struct nfs_open_context *ctx,
-               const struct iovec *iov, loff_t file_offset,
-               unsigned long nr_segs)
+static ssize_t nfs_direct_write(struct inode *inode,
+               struct nfs_open_context *ctx, const struct iovec *iov,
+               loff_t file_offset, unsigned long nr_segs)
 {
        ssize_t tot_bytes = 0;
        unsigned long seg = 0;
@@ -501,6 +680,8 @@ nfs_file_direct_read(struct kiocb *iocb, char __user *buf, size_t count, loff_t
 
        if (mapping->nrpages) {
                retval = filemap_fdatawrite(mapping);
+               if (retval == 0)
+                       retval = nfs_wb_all(inode);
                if (retval == 0)
                        retval = filemap_fdatawait(mapping);
                if (retval)
@@ -545,7 +726,7 @@ nfs_file_direct_write(struct kiocb *iocb, const char __user *buf, size_t count,
 {
        ssize_t retval = -EINVAL;
        loff_t *ppos = &iocb->ki_pos;
-       unsigned long limit = current->rlim[RLIMIT_FSIZE].rlim_cur;
+       unsigned long limit = current->signal->rlim[RLIMIT_FSIZE].rlim_cur;
        struct file *file = iocb->ki_filp;
        struct nfs_open_context *ctx =
                        (struct nfs_open_context *) file->private_data;
@@ -590,6 +771,8 @@ nfs_file_direct_write(struct kiocb *iocb, const char __user *buf, size_t count,
 
        if (mapping->nrpages) {
                retval = filemap_fdatawrite(mapping);
+               if (retval == 0)
+                       retval = nfs_wb_all(inode);
                if (retval == 0)
                        retval = filemap_fdatawait(mapping);
                if (retval)
@@ -605,3 +788,21 @@ nfs_file_direct_write(struct kiocb *iocb, const char __user *buf, size_t count,
 out:
        return retval;
 }
+
+int nfs_init_directcache(void)
+{
+       nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
+                                               sizeof(struct nfs_direct_req),
+                                               0, SLAB_RECLAIM_ACCOUNT,
+                                               NULL, NULL);
+       if (nfs_direct_cachep == NULL)
+               return -ENOMEM;
+
+       return 0;
+}
+
+void nfs_destroy_directcache(void)
+{
+       if (kmem_cache_destroy(nfs_direct_cachep))
+               printk(KERN_INFO "nfs_direct_cache: not all structures were freed\n");
+}