X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=fs%2Faio.c;h=e41e932ba489ff040fe9ed43385fc164a8dd6515;hb=43bc926fffd92024b46cafaf7350d669ba9ca884;hp=7afa222f68028a338eae68907ae231a446cc398c;hpb=cee37fe97739d85991964371c1f3a745c00dd236;p=linux-2.6.git diff --git a/fs/aio.c b/fs/aio.c index 7afa222f6..e41e932ba 100644 --- a/fs/aio.c +++ b/fs/aio.c @@ -41,8 +41,9 @@ #endif /*------ sysctl variables----*/ -atomic_t aio_nr = ATOMIC_INIT(0); /* current system wide number of aio requests */ -unsigned aio_max_nr = 0x10000; /* system wide maximum number of aio requests */ +static DEFINE_SPINLOCK(aio_nr_lock); +unsigned long aio_nr; /* current system wide number of aio requests */ +unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio requests */ /*----end sysctl variables---*/ static kmem_cache_t *kiocb_cachep; @@ -58,6 +59,7 @@ static DEFINE_SPINLOCK(fput_lock); static LIST_HEAD(fput_head); static void aio_kick_handler(void *); +static void aio_queue_work(struct kioctx *); /* aio_setup * Creates the slab caches used by the aio routines, panic on @@ -120,10 +122,9 @@ static int aio_setup_ring(struct kioctx *ctx) info->nr = 0; info->ring_pages = info->internal_pages; if (nr_pages > AIO_RING_PAGES) { - info->ring_pages = kmalloc(sizeof(struct page *) * nr_pages, GFP_KERNEL); + info->ring_pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL); if (!info->ring_pages) return -ENOMEM; - memset(info->ring_pages, 0, sizeof(struct page *) * nr_pages); } info->mmap_size = nr_pages * PAGE_SIZE; @@ -206,7 +207,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) return ERR_PTR(-EINVAL); } - if (nr_events > aio_max_nr) + if ((unsigned long)nr_events > aio_max_nr) return ERR_PTR(-EAGAIN); ctx = kmem_cache_alloc(kioctx_cachep, GFP_KERNEL); @@ -231,8 +232,14 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) goto out_freectx; /* limit the number of system wide aios */ - atomic_add(ctx->max_reqs, &aio_nr); /* undone by __put_ioctx */ - if (unlikely(atomic_read(&aio_nr) > aio_max_nr)) + spin_lock(&aio_nr_lock); + if (aio_nr + ctx->max_reqs > aio_max_nr || + aio_nr + ctx->max_reqs < aio_nr) + ctx->max_reqs = 0; + else + aio_nr += ctx->max_reqs; + spin_unlock(&aio_nr_lock); + if (ctx->max_reqs == 0) goto out_cleanup; /* now link into global list. kludge. FIXME */ @@ -246,8 +253,6 @@ static struct kioctx *ioctx_alloc(unsigned nr_events) return ctx; out_cleanup: - atomic_sub(ctx->max_reqs, &aio_nr); - ctx->max_reqs = 0; /* prevent __put_ioctx from sub'ing aio_nr */ __put_ioctx(ctx); return ERR_PTR(-EAGAIN); @@ -372,7 +377,12 @@ void fastcall __put_ioctx(struct kioctx *ctx) pr_debug("__put_ioctx: freeing %p\n", ctx); kmem_cache_free(kioctx_cachep, ctx); - atomic_sub(nr_events, &aio_nr); + if (nr_events) { + spin_lock(&aio_nr_lock); + BUG_ON(aio_nr - nr_events > aio_nr); + aio_nr -= nr_events; + spin_unlock(&aio_nr_lock); + } } /* aio_get_req @@ -396,7 +406,7 @@ static struct kiocb fastcall *__aio_get_req(struct kioctx *ctx) if (unlikely(!req)) return NULL; - req->ki_flags = 1 << KIF_LOCKED; + req->ki_flags = 0; req->ki_users = 2; req->ki_key = 0; req->ki_ctx = ctx; @@ -445,6 +455,8 @@ static inline struct kiocb *aio_get_req(struct kioctx *ctx) static inline void really_put_req(struct kioctx *ctx, struct kiocb *req) { + assert_spin_locked(&ctx->ctx_lock); + if (req->ki_dtor) req->ki_dtor(req); kmem_cache_free(kiocb_cachep, req); @@ -486,6 +498,8 @@ static int __aio_put_req(struct kioctx *ctx, struct kiocb *req) dprintk(KERN_DEBUG "aio_put(%p): f_count=%d\n", req, atomic_read(&req->ki_filp->f_count)); + assert_spin_locked(&ctx->ctx_lock); + req->ki_users --; if (unlikely(req->ki_users < 0)) BUG(); @@ -566,6 +580,10 @@ static void use_mm(struct mm_struct *mm) atomic_inc(&mm->mm_count); tsk->mm = mm; tsk->active_mm = mm; + /* + * Note that on UML this *requires* PF_BORROWED_MM to be set, otherwise + * it won't work. Update it accordingly if you change it here + */ activate_mm(active_mm, mm); task_unlock(tsk); @@ -603,14 +621,13 @@ static void unuse_mm(struct mm_struct *mm) * the kiocb (to tell the caller to activate the work * queue to process it), or 0, if it found that it was * already queued. - * - * Should be called with the spin lock iocb->ki_ctx->ctx_lock - * held */ static inline int __queue_kicked_iocb(struct kiocb *iocb) { struct kioctx *ctx = iocb->ki_ctx; + assert_spin_locked(&ctx->ctx_lock); + if (list_empty(&iocb->ki_run_list)) { list_add_tail(&iocb->ki_run_list, &ctx->run_list); @@ -716,19 +733,9 @@ static ssize_t aio_run_iocb(struct kiocb *iocb) ret = retry(iocb); current->io_wait = NULL; - if (-EIOCBRETRY != ret) { - if (-EIOCBQUEUED != ret) { - BUG_ON(!list_empty(&iocb->ki_wait.task_list)); - aio_complete(iocb, ret, 0); - /* must not access the iocb after this */ - } - } else { - /* - * Issue an additional retry to avoid waiting forever if - * no waits were queued (e.g. in case of a short read). - */ - if (list_empty(&iocb->ki_wait.task_list)) - kiocbSetKicked(iocb); + if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED) { + BUG_ON(!list_empty(&iocb->ki_wait.task_list)); + aio_complete(iocb, ret, 0); } out: spin_lock_irq(&ctx->ctx_lock); @@ -747,6 +754,14 @@ out: * has already been kicked */ if (kiocbIsKicked(iocb)) { __queue_kicked_iocb(iocb); + + /* + * __queue_kicked_iocb will always return 1 here, because + * iocb->ki_run_list is empty at this point so it should + * be safe to unconditionally queue the context into the + * work queue. + */ + aio_queue_work(ctx); } } return ret; @@ -757,13 +772,15 @@ out: * Process all pending retries queued on the ioctx * run list. * Assumes it is operating within the aio issuer's mm - * context. Expects to be called with ctx->ctx_lock held + * context. */ static int __aio_run_iocbs(struct kioctx *ctx) { struct kiocb *iocb; LIST_HEAD(run_list); + assert_spin_locked(&ctx->ctx_lock); + list_splice_init(&ctx->run_list, &run_list); while (!list_empty(&run_list)) { iocb = list_entry(run_list.next, struct kiocb, @@ -864,16 +881,24 @@ static void aio_kick_handler(void *data) * and if required activate the aio work queue to process * it */ -static void queue_kicked_iocb(struct kiocb *iocb) +static void try_queue_kicked_iocb(struct kiocb *iocb) { struct kioctx *ctx = iocb->ki_ctx; unsigned long flags; int run = 0; - WARN_ON((!list_empty(&iocb->ki_wait.task_list))); + /* We're supposed to be the only path putting the iocb back on the run + * list. If we find that the iocb is *back* on a wait queue already + * than retry has happened before we could queue the iocb. This also + * means that the retry could have completed and freed our iocb, no + * good. */ + BUG_ON((!list_empty(&iocb->ki_wait.task_list))); spin_lock_irqsave(&ctx->ctx_lock, flags); - run = __queue_kicked_iocb(iocb); + /* set this inside the lock so that we can't race with aio_run_iocb() + * testing it and putting the iocb on the run list under the lock */ + if (!kiocbTryKick(iocb)) + run = __queue_kicked_iocb(iocb); spin_unlock_irqrestore(&ctx->ctx_lock, flags); if (run) aio_queue_work(ctx); @@ -896,10 +921,7 @@ void fastcall kick_iocb(struct kiocb *iocb) return; } - /* If its already kicked we shouldn't queue it again */ - if (!kiocbTryKick(iocb)) { - queue_kicked_iocb(iocb); - } + try_queue_kicked_iocb(iocb); } EXPORT_SYMBOL(kick_iocb); @@ -918,28 +940,19 @@ int fastcall aio_complete(struct kiocb *iocb, long res, long res2) unsigned long tail; int ret; - /* Special case handling for sync iocbs: events go directly - * into the iocb for fast handling. Note that this will not - * work if we allow sync kiocbs to be cancelled. in which - * case the usage count checks will have to move under ctx_lock - * for all cases. + /* + * Special case handling for sync iocbs: + * - events go directly into the iocb for fast handling + * - the sync task with the iocb in its stack holds the single iocb + * ref, no other paths have a way to get another ref + * - the sync task helpfully left a reference to itself in the iocb */ if (is_sync_kiocb(iocb)) { - int ret; - + BUG_ON(iocb->ki_users != 1); iocb->ki_user_data = res; - if (iocb->ki_users == 1) { - iocb->ki_users = 0; - ret = 1; - } else { - spin_lock_irq(&ctx->ctx_lock); - iocb->ki_users--; - ret = (0 == iocb->ki_users); - spin_unlock_irq(&ctx->ctx_lock); - } - /* sync iocbs put the task here for us */ + iocb->ki_users = 0; wake_up_process(iocb->ki_obj.tsk); - return ret; + return 1; } info = &ctx->ring_info; @@ -1249,8 +1262,9 @@ asmlinkage long sys_io_setup(unsigned nr_events, aio_context_t __user *ctxp) goto out; ret = -EINVAL; - if (unlikely(ctx || (int)nr_events <= 0)) { - pr_debug("EINVAL: io_setup: ctx or nr_events > max\n"); + if (unlikely(ctx || nr_events == 0)) { + pr_debug("EINVAL: io_setup: ctx %lu nr_events %u\n", + ctx, nr_events); goto out; } @@ -1287,8 +1301,11 @@ asmlinkage long sys_io_destroy(aio_context_t ctx) } /* - * Default retry method for aio_read (also used for first time submit) - * Responsible for updating iocb state as retries progress + * aio_p{read,write} are the default ki_retry methods for + * IO_CMD_P{READ,WRITE}. They maintains kiocb retry state around potentially + * multiple calls to f_op->aio_read(). They loop around partial progress + * instead of returning -EIOCBRETRY because they don't have the means to call + * kick_iocb(). */ static ssize_t aio_pread(struct kiocb *iocb) { @@ -1297,25 +1314,25 @@ static ssize_t aio_pread(struct kiocb *iocb) struct inode *inode = mapping->host; ssize_t ret = 0; - ret = file->f_op->aio_read(iocb, iocb->ki_buf, - iocb->ki_left, iocb->ki_pos); + do { + ret = file->f_op->aio_read(iocb, iocb->ki_buf, + iocb->ki_left, iocb->ki_pos); + /* + * Can't just depend on iocb->ki_left to determine + * whether we are done. This may have been a short read. + */ + if (ret > 0) { + iocb->ki_buf += ret; + iocb->ki_left -= ret; + } - /* - * Can't just depend on iocb->ki_left to determine - * whether we are done. This may have been a short read. - */ - if (ret > 0) { - iocb->ki_buf += ret; - iocb->ki_left -= ret; /* - * For pipes and sockets we return once we have - * some data; for regular files we retry till we - * complete the entire read or find that we can't - * read any more data (e.g short reads). + * For pipes and sockets we return once we have some data; for + * regular files we retry till we complete the entire read or + * find that we can't read any more data (e.g short reads). */ - if (!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode)) - ret = -EIOCBRETRY; - } + } while (ret > 0 && iocb->ki_left > 0 && + !S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode)); /* This means we must have transferred all that we could */ /* No need to retry anymore */ @@ -1325,27 +1342,21 @@ static ssize_t aio_pread(struct kiocb *iocb) return ret; } -/* - * Default retry method for aio_write (also used for first time submit) - * Responsible for updating iocb state as retries progress - */ +/* see aio_pread() */ static ssize_t aio_pwrite(struct kiocb *iocb) { struct file *file = iocb->ki_filp; ssize_t ret = 0; - ret = file->f_op->aio_write(iocb, iocb->ki_buf, - iocb->ki_left, iocb->ki_pos); - - if (ret > 0) { - iocb->ki_buf += ret; - iocb->ki_left -= ret; - - ret = -EIOCBRETRY; - } + do { + ret = file->f_op->aio_write(iocb, iocb->ki_buf, + iocb->ki_left, iocb->ki_pos); + if (ret > 0) { + iocb->ki_buf += ret; + iocb->ki_left -= ret; + } + } while (ret > 0 && iocb->ki_left > 0); - /* This means we must have transferred all that we could */ - /* No need to retry anymore */ if ((ret == 0) || (iocb->ki_left == 0)) ret = iocb->ki_nbytes - iocb->ki_left; @@ -1391,6 +1402,9 @@ static ssize_t aio_setup_iocb(struct kiocb *kiocb) if (unlikely(!access_ok(VERIFY_WRITE, kiocb->ki_buf, kiocb->ki_left))) break; + ret = security_file_permission(file, MAY_READ); + if (unlikely(ret)) + break; ret = -EINVAL; if (file->f_op->aio_read) kiocb->ki_retry = aio_pread; @@ -1403,6 +1417,9 @@ static ssize_t aio_setup_iocb(struct kiocb *kiocb) if (unlikely(!access_ok(VERIFY_READ, kiocb->ki_buf, kiocb->ki_left))) break; + ret = security_file_permission(file, MAY_WRITE); + if (unlikely(ret)) + break; ret = -EINVAL; if (file->f_op->aio_write) kiocb->ki_retry = aio_pwrite; @@ -1514,10 +1531,8 @@ int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, goto out_put_req; spin_lock_irq(&ctx->ctx_lock); - if (likely(list_empty(&ctx->run_list))) { - aio_run_iocb(req); - } else { - list_add_tail(&req->ki_run_list, &ctx->run_list); + aio_run_iocb(req); + if (!list_empty(&ctx->run_list)) { /* drain the run list */ while (__aio_run_iocbs(ctx)) ; @@ -1592,12 +1607,14 @@ asmlinkage long sys_io_submit(aio_context_t ctx_id, long nr, /* lookup_kiocb * Finds a given iocb for cancellation. - * MUST be called with ctx->ctx_lock held. */ static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb, u32 key) { struct list_head *pos; + + assert_spin_locked(&ctx->ctx_lock); + /* TODO: use a hash or array, this sucks. */ list_for_each(pos, &ctx->active_reqs) { struct kiocb *kiocb = list_kiocb(pos); @@ -1660,7 +1677,7 @@ asmlinkage long sys_io_cancel(aio_context_t ctx_id, struct iocb __user *iocb, ret = -EFAULT; } } else - printk(KERN_DEBUG "iocb has no cancel operation\n"); + ret = -EINVAL; put_ioctx(ctx);