X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=fs%2Faio.c;h=0cd2c6a10e734c40e6fef234a7938ddea909e90c;hb=c7b5ebbddf7bcd3651947760f423e3783bbe6573;hp=9e7b5928e640f91b45c44f4699dadaa98d0c36b8;hpb=a2c21200f1c81b08cb55e417b68150bba439b646;p=linux-2.6.git diff --git a/fs/aio.c b/fs/aio.c index 9e7b5928e..0cd2c6a10 100644 --- a/fs/aio.c +++ b/fs/aio.c @@ -39,6 +39,9 @@ #define dprintk(x...) do { ; } while (0) #endif +long aio_run = 0; /* for testing only */ +long aio_wakeups = 0; /* for testing only */ + /*------ sysctl variables----*/ atomic_t aio_nr = ATOMIC_INIT(0); /* current system wide number of aio requests */ unsigned aio_max_nr = 0x10000; /* system wide maximum number of aio requests */ @@ -277,6 +280,7 @@ static void aio_cancel_all(struct kioctx *ctx) struct kiocb *iocb = list_kiocb(pos); list_del_init(&iocb->ki_list); cancel = iocb->ki_cancel; + kiocbSetCancelled(iocb); if (cancel) { iocb->ki_users++; spin_unlock_irq(&ctx->ctx_lock); @@ -337,6 +341,11 @@ void fastcall exit_aio(struct mm_struct *mm) aio_cancel_all(ctx); wait_for_all_aios(ctx); + /* + * this is an overkill, but ensures we don't leave + * the ctx on the aio_wq + */ + flush_workqueue(aio_wq); if (1 != atomic_read(&ctx->users)) printk(KERN_DEBUG @@ -359,6 +368,8 @@ void fastcall __put_ioctx(struct kioctx *ctx) if (unlikely(ctx->reqs_active)) BUG(); + cancel_delayed_work(&ctx->wq); + flush_workqueue(aio_wq); aio_free_ring(ctx); mmdrop(ctx->mm); ctx->mm = NULL; @@ -398,6 +409,7 @@ static struct kiocb fastcall *__aio_get_req(struct kioctx *ctx) req->ki_obj.user = NULL; req->ki_dtor = NULL; req->private = NULL; + INIT_LIST_HEAD(&req->ki_run_list); /* Check if the completion queue has enough free space to * accept an event from this io. @@ -543,85 +555,367 @@ struct kioctx *lookup_ioctx(unsigned long ctx_id) return ioctx; } +/* + * use_mm + * Makes the calling kernel thread take on the specified + * mm context. + * Called by the retry thread execute retries within the + * iocb issuer's mm context, so that copy_from/to_user + * operations work seamlessly for aio. + * (Note: this routine is intended to be called only + * from a kernel thread context) + */ static void use_mm(struct mm_struct *mm) { struct mm_struct *active_mm; + struct task_struct *tsk = current; + task_lock(tsk); + active_mm = tsk->active_mm; atomic_inc(&mm->mm_count); - task_lock(current); - active_mm = current->active_mm; - current->mm = mm; - if (mm != active_mm) { - current->active_mm = mm; - activate_mm(active_mm, mm); - } - task_unlock(current); + tsk->mm = mm; + tsk->active_mm = mm; + activate_mm(active_mm, mm); + task_unlock(tsk); + mmdrop(active_mm); } -static void unuse_mm(struct mm_struct *mm) +/* + * unuse_mm + * Reverses the effect of use_mm, i.e. releases the + * specified mm context which was earlier taken on + * by the calling kernel thread + * (Note: this routine is intended to be called only + * from a kernel thread context) + * + * Comments: Called with ctx->ctx_lock held. This nests + * task_lock instead ctx_lock. + */ +void unuse_mm(struct mm_struct *mm) { - task_lock(current); - current->mm = NULL; - task_unlock(current); + struct task_struct *tsk = current; + + task_lock(tsk); + tsk->mm = NULL; /* active_mm is still 'mm' */ - enter_lazy_tlb(mm, current); + enter_lazy_tlb(mm, tsk); + task_unlock(tsk); } -/* Run on kevent's context. FIXME: needs to be per-cpu and warn if an - * operation blocks. +/* + * Queue up a kiocb to be retried. Assumes that the kiocb + * has already been marked as kicked, and places it on + * the retry run list for the corresponding ioctx, if it + * isn't already queued. Returns 1 if it actually queued + * the kiocb (to tell the caller to activate the work + * queue to process it), or 0, if it found that it was + * already queued. + * + * Should be called with the spin lock iocb->ki_ctx->ctx_lock + * held */ -static void aio_kick_handler(void *data) +static inline int __queue_kicked_iocb(struct kiocb *iocb) { - struct kioctx *ctx = data; + struct kioctx *ctx = iocb->ki_ctx; - use_mm(ctx->mm); + if (list_empty(&iocb->ki_run_list)) { + list_add_tail(&iocb->ki_run_list, + &ctx->run_list); + iocb->ki_queued++; + return 1; + } + return 0; +} - spin_lock_irq(&ctx->ctx_lock); - while (!list_empty(&ctx->run_list)) { - struct kiocb *iocb; - long ret; +/* aio_run_iocb + * This is the core aio execution routine. It is + * invoked both for initial i/o submission and + * subsequent retries via the aio_kick_handler. + * Expects to be invoked with iocb->ki_ctx->lock + * already held. The lock is released and reaquired + * as needed during processing. + * + * Calls the iocb retry method (already setup for the + * iocb on initial submission) for operation specific + * handling, but takes care of most of common retry + * execution details for a given iocb. The retry method + * needs to be non-blocking as far as possible, to avoid + * holding up other iocbs waiting to be serviced by the + * retry kernel thread. + * + * The trickier parts in this code have to do with + * ensuring that only one retry instance is in progress + * for a given iocb at any time. Providing that guarantee + * simplifies the coding of individual aio operations as + * it avoids various potential races. + */ +static ssize_t aio_run_iocb(struct kiocb *iocb) +{ + struct kioctx *ctx = iocb->ki_ctx; + ssize_t (*retry)(struct kiocb *); + ssize_t ret; - iocb = list_entry(ctx->run_list.next, struct kiocb, - ki_run_list); - list_del(&iocb->ki_run_list); - iocb->ki_users ++; - spin_unlock_irq(&ctx->ctx_lock); + if (iocb->ki_retried++ > 1024*1024) { + printk("Maximal retry count. Bytes done %Zd\n", + iocb->ki_nbytes - iocb->ki_left); + return -EAGAIN; + } - kiocbClearKicked(iocb); - ret = iocb->ki_retry(iocb); - if (-EIOCBQUEUED != ret) { + if (!(iocb->ki_retried & 0xff)) { + pr_debug("%ld retry: %d of %d (kick %ld, Q %ld run %ld, wake %ld)\n", + iocb->ki_retried, + iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes, + iocb->ki_kicked, iocb->ki_queued, aio_run, aio_wakeups); + } + + if (!(retry = iocb->ki_retry)) { + printk("aio_run_iocb: iocb->ki_retry = NULL\n"); + return 0; + } + + /* + * We don't want the next retry iteration for this + * operation to start until this one has returned and + * updated the iocb state. However, wait_queue functions + * can trigger a kick_iocb from interrupt context in the + * meantime, indicating that data is available for the next + * iteration. We want to remember that and enable the + * next retry iteration _after_ we are through with + * this one. + * + * So, in order to be able to register a "kick", but + * prevent it from being queued now, we clear the kick + * flag, but make the kick code *think* that the iocb is + * still on the run list until we are actually done. + * When we are done with this iteration, we check if + * the iocb was kicked in the meantime and if so, queue + * it up afresh. + */ + + kiocbClearKicked(iocb); + + /* + * This is so that aio_complete knows it doesn't need to + * pull the iocb off the run list (We can't just call + * INIT_LIST_HEAD because we don't want a kick_iocb to + * queue this on the run list yet) + */ + iocb->ki_run_list.next = iocb->ki_run_list.prev = NULL; + spin_unlock_irq(&ctx->ctx_lock); + + /* Quit retrying if the i/o has been cancelled */ + if (kiocbIsCancelled(iocb)) { + ret = -EINTR; + aio_complete(iocb, ret, 0); + /* must not access the iocb after this */ + goto out; + } + + /* + * Now we are all set to call the retry method in async + * context. By setting this thread's io_wait context + * to point to the wait queue entry inside the currently + * running iocb for the duration of the retry, we ensure + * that async notification wakeups are queued by the + * operation instead of blocking waits, and when notified, + * cause the iocb to be kicked for continuation (through + * the aio_wake_function callback). + */ + BUG_ON(current->io_wait != NULL); + current->io_wait = &iocb->ki_wait; + ret = retry(iocb); + current->io_wait = NULL; + + if (-EIOCBRETRY != ret) { + if (-EIOCBQUEUED != ret) { + BUG_ON(!list_empty(&iocb->ki_wait.task_list)); aio_complete(iocb, ret, 0); - iocb = NULL; + /* must not access the iocb after this */ } + } else { + /* + * Issue an additional retry to avoid waiting forever if + * no waits were queued (e.g. in case of a short read). + */ + if (list_empty(&iocb->ki_wait.task_list)) + kiocbSetKicked(iocb); + } +out: + spin_lock_irq(&ctx->ctx_lock); - spin_lock_irq(&ctx->ctx_lock); - if (NULL != iocb) - __aio_put_req(ctx, iocb); + if (-EIOCBRETRY == ret) { + /* + * OK, now that we are done with this iteration + * and know that there is more left to go, + * this is where we let go so that a subsequent + * "kick" can start the next iteration + */ + + /* will make __queue_kicked_iocb succeed from here on */ + INIT_LIST_HEAD(&iocb->ki_run_list); + /* we must queue the next iteration ourselves, if it + * has already been kicked */ + if (kiocbIsKicked(iocb)) { + __queue_kicked_iocb(iocb); + } } + return ret; +} + +/* + * __aio_run_iocbs: + * Process all pending retries queued on the ioctx + * run list. + * Assumes it is operating within the aio issuer's mm + * context. Expects to be called with ctx->ctx_lock held + */ +static int __aio_run_iocbs(struct kioctx *ctx) +{ + struct kiocb *iocb; + int count = 0; + LIST_HEAD(run_list); + + list_splice_init(&ctx->run_list, &run_list); + while (!list_empty(&run_list)) { + iocb = list_entry(run_list.next, struct kiocb, + ki_run_list); + list_del(&iocb->ki_run_list); + /* + * Hold an extra reference while retrying i/o. + */ + iocb->ki_users++; /* grab extra reference */ + aio_run_iocb(iocb); + if (__aio_put_req(ctx, iocb)) /* drop extra ref */ + put_ioctx(ctx); + count++; + } + aio_run++; + if (!list_empty(&ctx->run_list)) + return 1; + return 0; +} + +static void aio_queue_work(struct kioctx * ctx) +{ + unsigned long timeout; + /* + * if someone is waiting, get the work started right + * away, otherwise, use a longer delay + */ + smp_mb(); + if (waitqueue_active(&ctx->wait)) + timeout = 1; + else + timeout = HZ/10; + queue_delayed_work(aio_wq, &ctx->wq, timeout); +} + + +/* + * aio_run_iocbs: + * Process all pending retries queued on the ioctx + * run list. + * Assumes it is operating within the aio issuer's mm + * context. + */ +static inline void aio_run_iocbs(struct kioctx *ctx) +{ + int requeue; + + spin_lock_irq(&ctx->ctx_lock); + + requeue = __aio_run_iocbs(ctx); spin_unlock_irq(&ctx->ctx_lock); + if (requeue) + aio_queue_work(ctx); +} - unuse_mm(ctx->mm); +/* + * just like aio_run_iocbs, but keeps running them until + * the list stays empty + */ +static inline void aio_run_all_iocbs(struct kioctx *ctx) +{ + spin_lock_irq(&ctx->ctx_lock); + while (__aio_run_iocbs(ctx)) + ; + spin_unlock_irq(&ctx->ctx_lock); } -void fastcall kick_iocb(struct kiocb *iocb) +/* + * aio_kick_handler: + * Work queue handler triggered to process pending + * retries on an ioctx. Takes on the aio issuer's + * mm context before running the iocbs, so that + * copy_xxx_user operates on the issuer's address + * space. + * Run on aiod's context. + */ +static void aio_kick_handler(void *data) { - struct kioctx *ctx = iocb->ki_ctx; + struct kioctx *ctx = data; + mm_segment_t oldfs = get_fs(); + int requeue; + set_fs(USER_DS); + use_mm(ctx->mm); + spin_lock_irq(&ctx->ctx_lock); + requeue =__aio_run_iocbs(ctx); + unuse_mm(ctx->mm); + spin_unlock_irq(&ctx->ctx_lock); + set_fs(oldfs); + /* + * we're in a worker thread already, don't use queue_delayed_work, + */ + if (requeue) + queue_work(aio_wq, &ctx->wq); +} + + +/* + * Called by kick_iocb to queue the kiocb for retry + * and if required activate the aio work queue to process + * it + */ +void queue_kicked_iocb(struct kiocb *iocb) +{ + struct kioctx *ctx = iocb->ki_ctx; + unsigned long flags; + int run = 0; + + WARN_ON((!list_empty(&iocb->ki_wait.task_list))); + + spin_lock_irqsave(&ctx->ctx_lock, flags); + run = __queue_kicked_iocb(iocb); + spin_unlock_irqrestore(&ctx->ctx_lock, flags); + if (run) { + aio_queue_work(ctx); + aio_wakeups++; + } +} + +/* + * kick_iocb: + * Called typically from a wait queue callback context + * (aio_wake_function) to trigger a retry of the iocb. + * The retry is usually executed by aio workqueue + * threads (See aio_kick_handler). + */ +void fastcall kick_iocb(struct kiocb *iocb) +{ /* sync iocbs are easy: they can only ever be executing from a * single context. */ if (is_sync_kiocb(iocb)) { kiocbSetKicked(iocb); - wake_up_process(iocb->ki_obj.tsk); + wake_up_process(iocb->ki_obj.tsk); return; } + iocb->ki_kicked++; + /* If its already kicked we shouldn't queue it again */ if (!kiocbTryKick(iocb)) { - unsigned long flags; - spin_lock_irqsave(&ctx->ctx_lock, flags); - list_add_tail(&iocb->ki_run_list, &ctx->run_list); - spin_unlock_irqrestore(&ctx->ctx_lock, flags); - queue_work(aio_wq, &ctx->wq); + queue_kicked_iocb(iocb); } } EXPORT_SYMBOL(kick_iocb); @@ -675,6 +969,16 @@ int fastcall aio_complete(struct kiocb *iocb, long res, long res2) */ spin_lock_irqsave(&ctx->ctx_lock, flags); + if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list)) + list_del_init(&iocb->ki_run_list); + + /* + * cancelled requests don't get events, userland was given one + * when the event got cancelled. + */ + if (kiocbIsCancelled(iocb)) + goto put_rq; + ring = kmap_atomic(info->ring_pages[0], KM_IRQ1); tail = info->tail; @@ -703,6 +1007,11 @@ int fastcall aio_complete(struct kiocb *iocb, long res, long res2) pr_debug("added to ring %p at [%lu]\n", iocb, tail); + pr_debug("%ld retries: %d of %d (kicked %ld, Q %ld run %ld wake %ld)\n", + iocb->ki_retried, + iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes, + iocb->ki_kicked, iocb->ki_queued, aio_run, aio_wakeups); +put_rq: /* everything turned out well, dispose of the aiocb. */ ret = __aio_put_req(ctx, iocb); @@ -759,7 +1068,7 @@ out: return ret; } -struct timeout { +struct aio_timeout { struct timer_list timer; int timed_out; struct task_struct *p; @@ -767,13 +1076,13 @@ struct timeout { static void timeout_func(unsigned long data) { - struct timeout *to = (struct timeout *)data; + struct aio_timeout *to = (struct aio_timeout *)data; to->timed_out = 1; wake_up_process(to->p); } -static inline void init_timeout(struct timeout *to) +static inline void init_timeout(struct aio_timeout *to) { init_timer(&to->timer); to->timer.data = (unsigned long)to; @@ -782,7 +1091,7 @@ static inline void init_timeout(struct timeout *to) to->p = current; } -static inline void set_timeout(long start_jiffies, struct timeout *to, +static inline void set_timeout(long start_jiffies, struct aio_timeout *to, const struct timespec *ts) { to->timer.expires = start_jiffies + timespec_to_jiffies(ts); @@ -792,7 +1101,7 @@ static inline void set_timeout(long start_jiffies, struct timeout *to, to->timed_out = 1; } -static inline void clear_timeout(struct timeout *to) +static inline void clear_timeout(struct aio_timeout *to) { del_singleshot_timer_sync(&to->timer); } @@ -808,14 +1117,16 @@ static int read_events(struct kioctx *ctx, int ret; int i = 0; struct io_event ent; - struct timeout to; + struct aio_timeout to; + int event_loop = 0; /* testing only */ + int retry = 0; /* needed to zero any padding within an entry (there shouldn't be * any, but C is fun! */ memset(&ent, 0, sizeof(ent)); +retry: ret = 0; - while (likely(i < nr)) { ret = aio_read_evt(ctx, &ent); if (unlikely(ret <= 0)) @@ -844,6 +1155,13 @@ static int read_events(struct kioctx *ctx, /* End fast path */ + /* racey check, but it gets redone */ + if (!retry && unlikely(!list_empty(&ctx->run_list))) { + retry = 1; + aio_run_all_iocbs(ctx); + goto retry; + } + init_timeout(&to); if (timeout) { struct timespec ts; @@ -858,7 +1176,6 @@ static int read_events(struct kioctx *ctx, add_wait_queue_exclusive(&ctx->wait, &wait); do { set_task_state(tsk, TASK_INTERRUPTIBLE); - ret = aio_read_evt(ctx, &ent); if (ret) break; @@ -868,6 +1185,7 @@ static int read_events(struct kioctx *ctx, if (to.timed_out) /* Only check after read evt */ break; schedule(); + event_loop++; if (signal_pending(tsk)) { ret = -EINTR; break; @@ -895,6 +1213,9 @@ static int read_events(struct kioctx *ctx, if (timeout) clear_timeout(&to); out: + pr_debug("event loop executed %d times\n", event_loop); + pr_debug("aio_run %ld\n", aio_run); + pr_debug("aio_wakeups %ld\n", aio_wakeups); return i ? i : ret; } @@ -962,7 +1283,7 @@ asmlinkage long sys_io_setup(unsigned nr_events, aio_context_t __user *ctxp) ret = put_user(ioctx->user_id, ctxp); if (!ret) return 0; - get_ioctx(ioctx); + io_destroy(ioctx); } @@ -987,13 +1308,181 @@ asmlinkage long sys_io_destroy(aio_context_t ctx) return -EINVAL; } +/* + * Default retry method for aio_read (also used for first time submit) + * Responsible for updating iocb state as retries progress + */ +static ssize_t aio_pread(struct kiocb *iocb) +{ + struct file *file = iocb->ki_filp; + struct address_space *mapping = file->f_mapping; + struct inode *inode = mapping->host; + ssize_t ret = 0; + + ret = file->f_op->aio_read(iocb, iocb->ki_buf, + iocb->ki_left, iocb->ki_pos); + + /* + * Can't just depend on iocb->ki_left to determine + * whether we are done. This may have been a short read. + */ + if (ret > 0) { + iocb->ki_buf += ret; + iocb->ki_left -= ret; + /* + * For pipes and sockets we return once we have + * some data; for regular files we retry till we + * complete the entire read or find that we can't + * read any more data (e.g short reads). + */ + if (!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode)) + ret = -EIOCBRETRY; + } + + /* This means we must have transferred all that we could */ + /* No need to retry anymore */ + if ((ret == 0) || (iocb->ki_left == 0)) + ret = iocb->ki_nbytes - iocb->ki_left; + + return ret; +} + +/* + * Default retry method for aio_write (also used for first time submit) + * Responsible for updating iocb state as retries progress + */ +static ssize_t aio_pwrite(struct kiocb *iocb) +{ + struct file *file = iocb->ki_filp; + ssize_t ret = 0; + + ret = file->f_op->aio_write(iocb, iocb->ki_buf, + iocb->ki_left, iocb->ki_pos); + + if (ret > 0) { + iocb->ki_buf += ret; + iocb->ki_left -= ret; + + ret = -EIOCBRETRY; + } + + /* This means we must have transferred all that we could */ + /* No need to retry anymore */ + if ((ret == 0) || (iocb->ki_left == 0)) + ret = iocb->ki_nbytes - iocb->ki_left; + + return ret; +} + +static ssize_t aio_fdsync(struct kiocb *iocb) +{ + struct file *file = iocb->ki_filp; + ssize_t ret = -EINVAL; + + if (file->f_op->aio_fsync) + ret = file->f_op->aio_fsync(iocb, 1); + return ret; +} + +static ssize_t aio_fsync(struct kiocb *iocb) +{ + struct file *file = iocb->ki_filp; + ssize_t ret = -EINVAL; + + if (file->f_op->aio_fsync) + ret = file->f_op->aio_fsync(iocb, 0); + return ret; +} + +/* + * aio_setup_iocb: + * Performs the initial checks and aio retry method + * setup for the kiocb at the time of io submission. + */ +ssize_t aio_setup_iocb(struct kiocb *kiocb) +{ + struct file *file = kiocb->ki_filp; + ssize_t ret = 0; + + switch (kiocb->ki_opcode) { + case IOCB_CMD_PREAD: + ret = -EBADF; + if (unlikely(!(file->f_mode & FMODE_READ))) + break; + ret = -EFAULT; + if (unlikely(!access_ok(VERIFY_WRITE, kiocb->ki_buf, + kiocb->ki_left))) + break; + ret = -EINVAL; + if (file->f_op->aio_read) + kiocb->ki_retry = aio_pread; + break; + case IOCB_CMD_PWRITE: + ret = -EBADF; + if (unlikely(!(file->f_mode & FMODE_WRITE))) + break; + ret = -EFAULT; + if (unlikely(!access_ok(VERIFY_READ, kiocb->ki_buf, + kiocb->ki_left))) + break; + ret = -EINVAL; + if (file->f_op->aio_write) + kiocb->ki_retry = aio_pwrite; + break; + case IOCB_CMD_FDSYNC: + ret = -EINVAL; + if (file->f_op->aio_fsync) + kiocb->ki_retry = aio_fdsync; + break; + case IOCB_CMD_FSYNC: + ret = -EINVAL; + if (file->f_op->aio_fsync) + kiocb->ki_retry = aio_fsync; + break; + default: + dprintk("EINVAL: io_submit: no operation provided\n"); + ret = -EINVAL; + } + + if (!kiocb->ki_retry) + return ret; + + return 0; +} + +/* + * aio_wake_function: + * wait queue callback function for aio notification, + * Simply triggers a retry of the operation via kick_iocb. + * + * This callback is specified in the wait queue entry in + * a kiocb (current->io_wait points to this wait queue + * entry when an aio operation executes; it is used + * instead of a synchronous wait when an i/o blocking + * condition is encountered during aio). + * + * Note: + * This routine is executed with the wait queue lock held. + * Since kick_iocb acquires iocb->ctx->ctx_lock, it nests + * the ioctx lock inside the wait queue lock. This is safe + * because this callback isn't used for wait queues which + * are nested inside ioctx lock (i.e. ctx->wait) + */ +int aio_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key) +{ + struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait); + + list_del_init(&wait->task_list); + kick_iocb(iocb); + return 1; +} + int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, struct iocb *iocb) { struct kiocb *req; struct file *file; ssize_t ret; - char __user *buf; /* enforce forwards compatibility on users */ if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 || @@ -1034,58 +1523,31 @@ int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, req->ki_user_data = iocb->aio_data; req->ki_pos = iocb->aio_offset; - buf = (char __user *)(unsigned long)iocb->aio_buf; + req->ki_buf = (char __user *)(unsigned long)iocb->aio_buf; + req->ki_left = req->ki_nbytes = iocb->aio_nbytes; + req->ki_opcode = iocb->aio_lio_opcode; + init_waitqueue_func_entry(&req->ki_wait, aio_wake_function); + INIT_LIST_HEAD(&req->ki_wait.task_list); + req->ki_run_list.next = req->ki_run_list.prev = NULL; + req->ki_retry = NULL; + req->ki_retried = 0; + req->ki_kicked = 0; + req->ki_queued = 0; + aio_run = 0; + aio_wakeups = 0; - switch (iocb->aio_lio_opcode) { - case IOCB_CMD_PREAD: - ret = -EBADF; - if (unlikely(!(file->f_mode & FMODE_READ))) - goto out_put_req; - ret = -EFAULT; - if (unlikely(!access_ok(VERIFY_WRITE, buf, iocb->aio_nbytes))) - goto out_put_req; - ret = security_file_permission (file, MAY_READ); - if (ret) - goto out_put_req; - ret = -EINVAL; - if (file->f_op->aio_read) - ret = file->f_op->aio_read(req, buf, - iocb->aio_nbytes, req->ki_pos); - break; - case IOCB_CMD_PWRITE: - ret = -EBADF; - if (unlikely(!(file->f_mode & FMODE_WRITE))) - goto out_put_req; - ret = -EFAULT; - if (unlikely(!access_ok(VERIFY_READ, buf, iocb->aio_nbytes))) - goto out_put_req; - ret = security_file_permission (file, MAY_WRITE); - if (ret) - goto out_put_req; - ret = -EINVAL; - if (file->f_op->aio_write) - ret = file->f_op->aio_write(req, buf, - iocb->aio_nbytes, req->ki_pos); - break; - case IOCB_CMD_FDSYNC: - ret = -EINVAL; - if (file->f_op->aio_fsync) - ret = file->f_op->aio_fsync(req, 1); - break; - case IOCB_CMD_FSYNC: - ret = -EINVAL; - if (file->f_op->aio_fsync) - ret = file->f_op->aio_fsync(req, 0); - break; - default: - dprintk("EINVAL: io_submit: no operation provided\n"); - ret = -EINVAL; - } + ret = aio_setup_iocb(req); + if (ret) + goto out_put_req; + + spin_lock_irq(&ctx->ctx_lock); + list_add_tail(&req->ki_run_list, &ctx->run_list); + /* drain the run list */ + while (__aio_run_iocbs(ctx)) + ; + spin_unlock_irq(&ctx->ctx_lock); aio_put_req(req); /* drop extra ref to req */ - if (likely(-EIOCBQUEUED == ret)) - return 0; - aio_complete(req, ret, 0); /* will drop i/o ref to req */ return 0; out_put_req: @@ -1201,6 +1663,7 @@ asmlinkage long sys_io_cancel(aio_context_t ctx_id, struct iocb __user *iocb, if (kiocb && kiocb->ki_cancel) { cancel = kiocb->ki_cancel; kiocb->ki_users ++; + kiocbSetCancelled(kiocb); } else cancel = NULL; spin_unlock_irq(&ctx->ctx_lock);