/*
* An async IO implementation for Linux
- * Written by Benjamin LaHaise <bcrl@redhat.com>
+ * Written by Benjamin LaHaise <bcrl@kvack.org>
*
* Implements an efficient asynchronous io interface.
*
#include <linux/time.h>
#include <linux/aio_abi.h>
#include <linux/module.h>
+#include <linux/syscalls.h>
#define DEBUG 0
#endif
/*------ sysctl variables----*/
-atomic_t aio_nr = ATOMIC_INIT(0); /* current system wide number of aio requests */
-unsigned aio_max_nr = 0x10000; /* system wide maximum number of aio requests */
+static DEFINE_SPINLOCK(aio_nr_lock);
+unsigned long aio_nr; /* current system wide number of aio requests */
+unsigned long aio_max_nr = 0x10000; /* system wide maximum number of aio requests */
/*----end sysctl variables---*/
static kmem_cache_t *kiocb_cachep;
static void aio_fput_routine(void *);
static DECLARE_WORK(fput_work, aio_fput_routine, NULL);
-static spinlock_t fput_lock = SPIN_LOCK_UNLOCKED;
-LIST_HEAD(fput_head);
+static DEFINE_SPINLOCK(fput_lock);
+static LIST_HEAD(fput_head);
static void aio_kick_handler(void *);
+static void aio_queue_work(struct kioctx *);
/* aio_setup
* Creates the slab caches used by the aio routines, panic on
if (nr_pages < 0)
return -EINVAL;
- info->nr_pages = nr_pages;
-
nr_events = (PAGE_SIZE * nr_pages - sizeof(struct aio_ring)) / sizeof(struct io_event);
info->nr = 0;
info->ring_pages = info->internal_pages;
if (nr_pages > AIO_RING_PAGES) {
- info->ring_pages = kmalloc(sizeof(struct page *) * nr_pages, GFP_KERNEL);
+ info->ring_pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
if (!info->ring_pages)
return -ENOMEM;
- memset(info->ring_pages, 0, sizeof(struct page *) * nr_pages);
}
info->mmap_size = nr_pages * PAGE_SIZE;
return ERR_PTR(-EINVAL);
}
- if (nr_events > aio_max_nr)
+ if ((unsigned long)nr_events > aio_max_nr)
return ERR_PTR(-EAGAIN);
ctx = kmem_cache_alloc(kioctx_cachep, GFP_KERNEL);
goto out_freectx;
/* limit the number of system wide aios */
- atomic_add(ctx->max_reqs, &aio_nr); /* undone by __put_ioctx */
- if (unlikely(atomic_read(&aio_nr) > aio_max_nr))
+ spin_lock(&aio_nr_lock);
+ if (aio_nr + ctx->max_reqs > aio_max_nr ||
+ aio_nr + ctx->max_reqs < aio_nr)
+ ctx->max_reqs = 0;
+ else
+ aio_nr += ctx->max_reqs;
+ spin_unlock(&aio_nr_lock);
+ if (ctx->max_reqs == 0)
goto out_cleanup;
/* now link into global list. kludge. FIXME */
return ctx;
out_cleanup:
- atomic_sub(ctx->max_reqs, &aio_nr);
- ctx->max_reqs = 0; /* prevent __put_ioctx from sub'ing aio_nr */
__put_ioctx(ctx);
return ERR_PTR(-EAGAIN);
struct kiocb *iocb = list_kiocb(pos);
list_del_init(&iocb->ki_list);
cancel = iocb->ki_cancel;
+ kiocbSetCancelled(iocb);
if (cancel) {
iocb->ki_users++;
spin_unlock_irq(&ctx->ctx_lock);
spin_unlock_irq(&ctx->ctx_lock);
}
-void wait_for_all_aios(struct kioctx *ctx)
+static void wait_for_all_aios(struct kioctx *ctx)
{
struct task_struct *tsk = current;
DECLARE_WAITQUEUE(wait, tsk);
aio_cancel_all(ctx);
wait_for_all_aios(ctx);
+ /*
+ * this is an overkill, but ensures we don't leave
+ * the ctx on the aio_wq
+ */
+ flush_workqueue(aio_wq);
if (1 != atomic_read(&ctx->users))
printk(KERN_DEBUG
if (unlikely(ctx->reqs_active))
BUG();
+ cancel_delayed_work(&ctx->wq);
+ flush_workqueue(aio_wq);
aio_free_ring(ctx);
mmdrop(ctx->mm);
ctx->mm = NULL;
pr_debug("__put_ioctx: freeing %p\n", ctx);
kmem_cache_free(kioctx_cachep, ctx);
- atomic_sub(nr_events, &aio_nr);
+ if (nr_events) {
+ spin_lock(&aio_nr_lock);
+ BUG_ON(aio_nr - nr_events > aio_nr);
+ aio_nr -= nr_events;
+ spin_unlock(&aio_nr_lock);
+ }
}
/* aio_get_req
if (unlikely(!req))
return NULL;
- req->ki_flags = 1 << KIF_LOCKED;
+ req->ki_flags = 0;
req->ki_users = 2;
req->ki_key = 0;
req->ki_ctx = ctx;
req->ki_cancel = NULL;
req->ki_retry = NULL;
- req->ki_obj.user = NULL;
req->ki_dtor = NULL;
req->private = NULL;
+ INIT_LIST_HEAD(&req->ki_run_list);
/* Check if the completion queue has enough free space to
* accept an event from this io.
static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
{
+ assert_spin_locked(&ctx->ctx_lock);
+
if (req->ki_dtor)
req->ki_dtor(req);
- req->ki_ctx = NULL;
- req->ki_filp = NULL;
- req->ki_obj.user = NULL;
- req->ki_dtor = NULL;
- req->private = NULL;
kmem_cache_free(kiocb_cachep, req);
ctx->reqs_active--;
dprintk(KERN_DEBUG "aio_put(%p): f_count=%d\n",
req, atomic_read(&req->ki_filp->f_count));
+ assert_spin_locked(&ctx->ctx_lock);
+
req->ki_users --;
if (unlikely(req->ki_users < 0))
BUG();
return ioctx;
}
+/*
+ * use_mm
+ * Makes the calling kernel thread take on the specified
+ * mm context.
+ * Called by the retry thread execute retries within the
+ * iocb issuer's mm context, so that copy_from/to_user
+ * operations work seamlessly for aio.
+ * (Note: this routine is intended to be called only
+ * from a kernel thread context)
+ */
static void use_mm(struct mm_struct *mm)
{
struct mm_struct *active_mm;
+ struct task_struct *tsk = current;
+ task_lock(tsk);
+ tsk->flags |= PF_BORROWED_MM;
+ active_mm = tsk->active_mm;
atomic_inc(&mm->mm_count);
- task_lock(current);
- active_mm = current->active_mm;
- current->mm = mm;
- if (mm != active_mm) {
- current->active_mm = mm;
- activate_mm(active_mm, mm);
- }
- task_unlock(current);
+ tsk->mm = mm;
+ tsk->active_mm = mm;
+ /*
+ * Note that on UML this *requires* PF_BORROWED_MM to be set, otherwise
+ * it won't work. Update it accordingly if you change it here
+ */
+ activate_mm(active_mm, mm);
+ task_unlock(tsk);
+
mmdrop(active_mm);
}
+/*
+ * unuse_mm
+ * Reverses the effect of use_mm, i.e. releases the
+ * specified mm context which was earlier taken on
+ * by the calling kernel thread
+ * (Note: this routine is intended to be called only
+ * from a kernel thread context)
+ *
+ * Comments: Called with ctx->ctx_lock held. This nests
+ * task_lock instead ctx_lock.
+ */
static void unuse_mm(struct mm_struct *mm)
{
- task_lock(current);
- current->mm = NULL;
- task_unlock(current);
+ struct task_struct *tsk = current;
+
+ task_lock(tsk);
+ tsk->flags &= ~PF_BORROWED_MM;
+ tsk->mm = NULL;
/* active_mm is still 'mm' */
- enter_lazy_tlb(mm, current);
+ enter_lazy_tlb(mm, tsk);
+ task_unlock(tsk);
}
-/* Run on kevent's context. FIXME: needs to be per-cpu and warn if an
- * operation blocks.
+/*
+ * Queue up a kiocb to be retried. Assumes that the kiocb
+ * has already been marked as kicked, and places it on
+ * the retry run list for the corresponding ioctx, if it
+ * isn't already queued. Returns 1 if it actually queued
+ * the kiocb (to tell the caller to activate the work
+ * queue to process it), or 0, if it found that it was
+ * already queued.
*/
-static void aio_kick_handler(void *data)
+static inline int __queue_kicked_iocb(struct kiocb *iocb)
{
- struct kioctx *ctx = data;
+ struct kioctx *ctx = iocb->ki_ctx;
- use_mm(ctx->mm);
+ assert_spin_locked(&ctx->ctx_lock);
+
+ if (list_empty(&iocb->ki_run_list)) {
+ list_add_tail(&iocb->ki_run_list,
+ &ctx->run_list);
+ return 1;
+ }
+ return 0;
+}
+
+/* aio_run_iocb
+ * This is the core aio execution routine. It is
+ * invoked both for initial i/o submission and
+ * subsequent retries via the aio_kick_handler.
+ * Expects to be invoked with iocb->ki_ctx->lock
+ * already held. The lock is released and reaquired
+ * as needed during processing.
+ *
+ * Calls the iocb retry method (already setup for the
+ * iocb on initial submission) for operation specific
+ * handling, but takes care of most of common retry
+ * execution details for a given iocb. The retry method
+ * needs to be non-blocking as far as possible, to avoid
+ * holding up other iocbs waiting to be serviced by the
+ * retry kernel thread.
+ *
+ * The trickier parts in this code have to do with
+ * ensuring that only one retry instance is in progress
+ * for a given iocb at any time. Providing that guarantee
+ * simplifies the coding of individual aio operations as
+ * it avoids various potential races.
+ */
+static ssize_t aio_run_iocb(struct kiocb *iocb)
+{
+ struct kioctx *ctx = iocb->ki_ctx;
+ ssize_t (*retry)(struct kiocb *);
+ ssize_t ret;
+
+ if (iocb->ki_retried++ > 1024*1024) {
+ printk("Maximal retry count. Bytes done %Zd\n",
+ iocb->ki_nbytes - iocb->ki_left);
+ return -EAGAIN;
+ }
+
+ if (!(iocb->ki_retried & 0xff)) {
+ pr_debug("%ld retry: %d of %d\n", iocb->ki_retried,
+ iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes);
+ }
+
+ if (!(retry = iocb->ki_retry)) {
+ printk("aio_run_iocb: iocb->ki_retry = NULL\n");
+ return 0;
+ }
+
+ /*
+ * We don't want the next retry iteration for this
+ * operation to start until this one has returned and
+ * updated the iocb state. However, wait_queue functions
+ * can trigger a kick_iocb from interrupt context in the
+ * meantime, indicating that data is available for the next
+ * iteration. We want to remember that and enable the
+ * next retry iteration _after_ we are through with
+ * this one.
+ *
+ * So, in order to be able to register a "kick", but
+ * prevent it from being queued now, we clear the kick
+ * flag, but make the kick code *think* that the iocb is
+ * still on the run list until we are actually done.
+ * When we are done with this iteration, we check if
+ * the iocb was kicked in the meantime and if so, queue
+ * it up afresh.
+ */
+
+ kiocbClearKicked(iocb);
+
+ /*
+ * This is so that aio_complete knows it doesn't need to
+ * pull the iocb off the run list (We can't just call
+ * INIT_LIST_HEAD because we don't want a kick_iocb to
+ * queue this on the run list yet)
+ */
+ iocb->ki_run_list.next = iocb->ki_run_list.prev = NULL;
+ spin_unlock_irq(&ctx->ctx_lock);
+
+ /* Quit retrying if the i/o has been cancelled */
+ if (kiocbIsCancelled(iocb)) {
+ ret = -EINTR;
+ aio_complete(iocb, ret, 0);
+ /* must not access the iocb after this */
+ goto out;
+ }
+ /*
+ * Now we are all set to call the retry method in async
+ * context. By setting this thread's io_wait context
+ * to point to the wait queue entry inside the currently
+ * running iocb for the duration of the retry, we ensure
+ * that async notification wakeups are queued by the
+ * operation instead of blocking waits, and when notified,
+ * cause the iocb to be kicked for continuation (through
+ * the aio_wake_function callback).
+ */
+ BUG_ON(current->io_wait != NULL);
+ current->io_wait = &iocb->ki_wait;
+ ret = retry(iocb);
+ current->io_wait = NULL;
+
+ if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED) {
+ BUG_ON(!list_empty(&iocb->ki_wait.task_list));
+ aio_complete(iocb, ret, 0);
+ }
+out:
spin_lock_irq(&ctx->ctx_lock);
- while (!list_empty(&ctx->run_list)) {
- struct kiocb *iocb;
- long ret;
- iocb = list_entry(ctx->run_list.next, struct kiocb,
- ki_run_list);
+ if (-EIOCBRETRY == ret) {
+ /*
+ * OK, now that we are done with this iteration
+ * and know that there is more left to go,
+ * this is where we let go so that a subsequent
+ * "kick" can start the next iteration
+ */
+
+ /* will make __queue_kicked_iocb succeed from here on */
+ INIT_LIST_HEAD(&iocb->ki_run_list);
+ /* we must queue the next iteration ourselves, if it
+ * has already been kicked */
+ if (kiocbIsKicked(iocb)) {
+ __queue_kicked_iocb(iocb);
+
+ /*
+ * __queue_kicked_iocb will always return 1 here, because
+ * iocb->ki_run_list is empty at this point so it should
+ * be safe to unconditionally queue the context into the
+ * work queue.
+ */
+ aio_queue_work(ctx);
+ }
+ }
+ return ret;
+}
+
+/*
+ * __aio_run_iocbs:
+ * Process all pending retries queued on the ioctx
+ * run list.
+ * Assumes it is operating within the aio issuer's mm
+ * context.
+ */
+static int __aio_run_iocbs(struct kioctx *ctx)
+{
+ struct kiocb *iocb;
+ LIST_HEAD(run_list);
+
+ assert_spin_locked(&ctx->ctx_lock);
+
+ list_splice_init(&ctx->run_list, &run_list);
+ while (!list_empty(&run_list)) {
+ iocb = list_entry(run_list.next, struct kiocb,
+ ki_run_list);
list_del(&iocb->ki_run_list);
- iocb->ki_users ++;
- spin_unlock_irq(&ctx->ctx_lock);
+ /*
+ * Hold an extra reference while retrying i/o.
+ */
+ iocb->ki_users++; /* grab extra reference */
+ aio_run_iocb(iocb);
+ if (__aio_put_req(ctx, iocb)) /* drop extra ref */
+ put_ioctx(ctx);
+ }
+ if (!list_empty(&ctx->run_list))
+ return 1;
+ return 0;
+}
- kiocbClearKicked(iocb);
- ret = iocb->ki_retry(iocb);
- if (-EIOCBQUEUED != ret) {
- aio_complete(iocb, ret, 0);
- iocb = NULL;
- }
+static void aio_queue_work(struct kioctx * ctx)
+{
+ unsigned long timeout;
+ /*
+ * if someone is waiting, get the work started right
+ * away, otherwise, use a longer delay
+ */
+ smp_mb();
+ if (waitqueue_active(&ctx->wait))
+ timeout = 1;
+ else
+ timeout = HZ/10;
+ queue_delayed_work(aio_wq, &ctx->wq, timeout);
+}
- spin_lock_irq(&ctx->ctx_lock);
- if (NULL != iocb)
- __aio_put_req(ctx, iocb);
- }
+
+/*
+ * aio_run_iocbs:
+ * Process all pending retries queued on the ioctx
+ * run list.
+ * Assumes it is operating within the aio issuer's mm
+ * context.
+ */
+static inline void aio_run_iocbs(struct kioctx *ctx)
+{
+ int requeue;
+
+ spin_lock_irq(&ctx->ctx_lock);
+
+ requeue = __aio_run_iocbs(ctx);
spin_unlock_irq(&ctx->ctx_lock);
+ if (requeue)
+ aio_queue_work(ctx);
+}
- unuse_mm(ctx->mm);
+/*
+ * just like aio_run_iocbs, but keeps running them until
+ * the list stays empty
+ */
+static inline void aio_run_all_iocbs(struct kioctx *ctx)
+{
+ spin_lock_irq(&ctx->ctx_lock);
+ while (__aio_run_iocbs(ctx))
+ ;
+ spin_unlock_irq(&ctx->ctx_lock);
}
-void fastcall kick_iocb(struct kiocb *iocb)
+/*
+ * aio_kick_handler:
+ * Work queue handler triggered to process pending
+ * retries on an ioctx. Takes on the aio issuer's
+ * mm context before running the iocbs, so that
+ * copy_xxx_user operates on the issuer's address
+ * space.
+ * Run on aiod's context.
+ */
+static void aio_kick_handler(void *data)
{
- struct kioctx *ctx = iocb->ki_ctx;
+ struct kioctx *ctx = data;
+ mm_segment_t oldfs = get_fs();
+ int requeue;
+
+ set_fs(USER_DS);
+ use_mm(ctx->mm);
+ spin_lock_irq(&ctx->ctx_lock);
+ requeue =__aio_run_iocbs(ctx);
+ unuse_mm(ctx->mm);
+ spin_unlock_irq(&ctx->ctx_lock);
+ set_fs(oldfs);
+ /*
+ * we're in a worker thread already, don't use queue_delayed_work,
+ */
+ if (requeue)
+ queue_work(aio_wq, &ctx->wq);
+}
+
+
+/*
+ * Called by kick_iocb to queue the kiocb for retry
+ * and if required activate the aio work queue to process
+ * it
+ */
+static void try_queue_kicked_iocb(struct kiocb *iocb)
+{
+ struct kioctx *ctx = iocb->ki_ctx;
+ unsigned long flags;
+ int run = 0;
+
+ /* We're supposed to be the only path putting the iocb back on the run
+ * list. If we find that the iocb is *back* on a wait queue already
+ * than retry has happened before we could queue the iocb. This also
+ * means that the retry could have completed and freed our iocb, no
+ * good. */
+ BUG_ON((!list_empty(&iocb->ki_wait.task_list)));
+
+ spin_lock_irqsave(&ctx->ctx_lock, flags);
+ /* set this inside the lock so that we can't race with aio_run_iocb()
+ * testing it and putting the iocb on the run list under the lock */
+ if (!kiocbTryKick(iocb))
+ run = __queue_kicked_iocb(iocb);
+ spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+ if (run)
+ aio_queue_work(ctx);
+}
+/*
+ * kick_iocb:
+ * Called typically from a wait queue callback context
+ * (aio_wake_function) to trigger a retry of the iocb.
+ * The retry is usually executed by aio workqueue
+ * threads (See aio_kick_handler).
+ */
+void fastcall kick_iocb(struct kiocb *iocb)
+{
/* sync iocbs are easy: they can only ever be executing from a
* single context. */
if (is_sync_kiocb(iocb)) {
kiocbSetKicked(iocb);
- wake_up_process(iocb->ki_obj.tsk);
+ wake_up_process(iocb->ki_obj.tsk);
return;
}
- if (!kiocbTryKick(iocb)) {
- unsigned long flags;
- spin_lock_irqsave(&ctx->ctx_lock, flags);
- list_add_tail(&iocb->ki_run_list, &ctx->run_list);
- spin_unlock_irqrestore(&ctx->ctx_lock, flags);
- queue_work(aio_wq, &ctx->wq);
- }
+ try_queue_kicked_iocb(iocb);
}
EXPORT_SYMBOL(kick_iocb);
unsigned long tail;
int ret;
- /* Special case handling for sync iocbs: events go directly
- * into the iocb for fast handling. Note that this will not
- * work if we allow sync kiocbs to be cancelled. in which
- * case the usage count checks will have to move under ctx_lock
- * for all cases.
+ /*
+ * Special case handling for sync iocbs:
+ * - events go directly into the iocb for fast handling
+ * - the sync task with the iocb in its stack holds the single iocb
+ * ref, no other paths have a way to get another ref
+ * - the sync task helpfully left a reference to itself in the iocb
*/
if (is_sync_kiocb(iocb)) {
- int ret;
-
+ BUG_ON(iocb->ki_users != 1);
iocb->ki_user_data = res;
- if (iocb->ki_users == 1) {
- iocb->ki_users = 0;
- ret = 1;
- } else {
- spin_lock_irq(&ctx->ctx_lock);
- iocb->ki_users--;
- ret = (0 == iocb->ki_users);
- spin_unlock_irq(&ctx->ctx_lock);
- }
- /* sync iocbs put the task here for us */
+ iocb->ki_users = 0;
wake_up_process(iocb->ki_obj.tsk);
- return ret;
+ return 1;
}
info = &ctx->ring_info;
*/
spin_lock_irqsave(&ctx->ctx_lock, flags);
+ if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list))
+ list_del_init(&iocb->ki_run_list);
+
+ /*
+ * cancelled requests don't get events, userland was given one
+ * when the event got cancelled.
+ */
+ if (kiocbIsCancelled(iocb))
+ goto put_rq;
+
ring = kmap_atomic(info->ring_pages[0], KM_IRQ1);
tail = info->tail;
event = aio_ring_event(info, tail, KM_IRQ0);
- tail = (tail + 1) % info->nr;
+ if (++tail >= info->nr)
+ tail = 0;
event->obj = (u64)(unsigned long)iocb->ki_obj.user;
event->data = iocb->ki_user_data;
pr_debug("added to ring %p at [%lu]\n", iocb, tail);
+ pr_debug("%ld retries: %d of %d\n", iocb->ki_retried,
+ iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes);
+put_rq:
/* everything turned out well, dispose of the aiocb. */
ret = __aio_put_req(ctx, iocb);
return ret;
}
-struct timeout {
+struct aio_timeout {
struct timer_list timer;
int timed_out;
struct task_struct *p;
static void timeout_func(unsigned long data)
{
- struct timeout *to = (struct timeout *)data;
+ struct aio_timeout *to = (struct aio_timeout *)data;
to->timed_out = 1;
wake_up_process(to->p);
}
-static inline void init_timeout(struct timeout *to)
+static inline void init_timeout(struct aio_timeout *to)
{
init_timer(&to->timer);
to->timer.data = (unsigned long)to;
to->p = current;
}
-static inline void set_timeout(long start_jiffies, struct timeout *to,
+static inline void set_timeout(long start_jiffies, struct aio_timeout *to,
const struct timespec *ts)
{
to->timer.expires = start_jiffies + timespec_to_jiffies(ts);
to->timed_out = 1;
}
-static inline void clear_timeout(struct timeout *to)
+static inline void clear_timeout(struct aio_timeout *to)
{
del_singleshot_timer_sync(&to->timer);
}
int ret;
int i = 0;
struct io_event ent;
- struct timeout to;
+ struct aio_timeout to;
+ int retry = 0;
/* needed to zero any padding within an entry (there shouldn't be
* any, but C is fun!
*/
memset(&ent, 0, sizeof(ent));
+retry:
ret = 0;
-
while (likely(i < nr)) {
ret = aio_read_evt(ctx, &ent);
if (unlikely(ret <= 0))
/* End fast path */
+ /* racey check, but it gets redone */
+ if (!retry && unlikely(!list_empty(&ctx->run_list))) {
+ retry = 1;
+ aio_run_all_iocbs(ctx);
+ goto retry;
+ }
+
init_timeout(&to);
if (timeout) {
struct timespec ts;
add_wait_queue_exclusive(&ctx->wait, &wait);
do {
set_task_state(tsk, TASK_INTERRUPTIBLE);
-
ret = aio_read_evt(ctx, &ent);
if (ret)
break;
goto out;
ret = -EINVAL;
- if (unlikely(ctx || (int)nr_events <= 0)) {
- pr_debug("EINVAL: io_setup: ctx or nr_events > max\n");
+ if (unlikely(ctx || nr_events == 0)) {
+ pr_debug("EINVAL: io_setup: ctx %lu nr_events %u\n",
+ ctx, nr_events);
goto out;
}
ret = put_user(ioctx->user_id, ctxp);
if (!ret)
return 0;
- get_ioctx(ioctx);
+
+ get_ioctx(ioctx); /* io_destroy() expects us to hold a ref */
io_destroy(ioctx);
}
return -EINVAL;
}
+/*
+ * aio_p{read,write} are the default ki_retry methods for
+ * IO_CMD_P{READ,WRITE}. They maintains kiocb retry state around potentially
+ * multiple calls to f_op->aio_read(). They loop around partial progress
+ * instead of returning -EIOCBRETRY because they don't have the means to call
+ * kick_iocb().
+ */
+static ssize_t aio_pread(struct kiocb *iocb)
+{
+ struct file *file = iocb->ki_filp;
+ struct address_space *mapping = file->f_mapping;
+ struct inode *inode = mapping->host;
+ ssize_t ret = 0;
+
+ do {
+ ret = file->f_op->aio_read(iocb, iocb->ki_buf,
+ iocb->ki_left, iocb->ki_pos);
+ /*
+ * Can't just depend on iocb->ki_left to determine
+ * whether we are done. This may have been a short read.
+ */
+ if (ret > 0) {
+ iocb->ki_buf += ret;
+ iocb->ki_left -= ret;
+ }
+
+ /*
+ * For pipes and sockets we return once we have some data; for
+ * regular files we retry till we complete the entire read or
+ * find that we can't read any more data (e.g short reads).
+ */
+ } while (ret > 0 && iocb->ki_left > 0 &&
+ !S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode));
+
+ /* This means we must have transferred all that we could */
+ /* No need to retry anymore */
+ if ((ret == 0) || (iocb->ki_left == 0))
+ ret = iocb->ki_nbytes - iocb->ki_left;
+
+ return ret;
+}
+
+/* see aio_pread() */
+static ssize_t aio_pwrite(struct kiocb *iocb)
+{
+ struct file *file = iocb->ki_filp;
+ ssize_t ret = 0;
+
+ do {
+ ret = file->f_op->aio_write(iocb, iocb->ki_buf,
+ iocb->ki_left, iocb->ki_pos);
+ if (ret > 0) {
+ iocb->ki_buf += ret;
+ iocb->ki_left -= ret;
+ }
+ } while (ret > 0 && iocb->ki_left > 0);
+
+ if ((ret == 0) || (iocb->ki_left == 0))
+ ret = iocb->ki_nbytes - iocb->ki_left;
+
+ return ret;
+}
+
+static ssize_t aio_fdsync(struct kiocb *iocb)
+{
+ struct file *file = iocb->ki_filp;
+ ssize_t ret = -EINVAL;
+
+ if (file->f_op->aio_fsync)
+ ret = file->f_op->aio_fsync(iocb, 1);
+ return ret;
+}
+
+static ssize_t aio_fsync(struct kiocb *iocb)
+{
+ struct file *file = iocb->ki_filp;
+ ssize_t ret = -EINVAL;
+
+ if (file->f_op->aio_fsync)
+ ret = file->f_op->aio_fsync(iocb, 0);
+ return ret;
+}
+
+/*
+ * aio_setup_iocb:
+ * Performs the initial checks and aio retry method
+ * setup for the kiocb at the time of io submission.
+ */
+static ssize_t aio_setup_iocb(struct kiocb *kiocb)
+{
+ struct file *file = kiocb->ki_filp;
+ ssize_t ret = 0;
+
+ switch (kiocb->ki_opcode) {
+ case IOCB_CMD_PREAD:
+ ret = -EBADF;
+ if (unlikely(!(file->f_mode & FMODE_READ)))
+ break;
+ ret = -EFAULT;
+ if (unlikely(!access_ok(VERIFY_WRITE, kiocb->ki_buf,
+ kiocb->ki_left)))
+ break;
+ ret = security_file_permission(file, MAY_READ);
+ if (unlikely(ret))
+ break;
+ ret = -EINVAL;
+ if (file->f_op->aio_read)
+ kiocb->ki_retry = aio_pread;
+ break;
+ case IOCB_CMD_PWRITE:
+ ret = -EBADF;
+ if (unlikely(!(file->f_mode & FMODE_WRITE)))
+ break;
+ ret = -EFAULT;
+ if (unlikely(!access_ok(VERIFY_READ, kiocb->ki_buf,
+ kiocb->ki_left)))
+ break;
+ ret = security_file_permission(file, MAY_WRITE);
+ if (unlikely(ret))
+ break;
+ ret = -EINVAL;
+ if (file->f_op->aio_write)
+ kiocb->ki_retry = aio_pwrite;
+ break;
+ case IOCB_CMD_FDSYNC:
+ ret = -EINVAL;
+ if (file->f_op->aio_fsync)
+ kiocb->ki_retry = aio_fdsync;
+ break;
+ case IOCB_CMD_FSYNC:
+ ret = -EINVAL;
+ if (file->f_op->aio_fsync)
+ kiocb->ki_retry = aio_fsync;
+ break;
+ default:
+ dprintk("EINVAL: io_submit: no operation provided\n");
+ ret = -EINVAL;
+ }
+
+ if (!kiocb->ki_retry)
+ return ret;
+
+ return 0;
+}
+
+/*
+ * aio_wake_function:
+ * wait queue callback function for aio notification,
+ * Simply triggers a retry of the operation via kick_iocb.
+ *
+ * This callback is specified in the wait queue entry in
+ * a kiocb (current->io_wait points to this wait queue
+ * entry when an aio operation executes; it is used
+ * instead of a synchronous wait when an i/o blocking
+ * condition is encountered during aio).
+ *
+ * Note:
+ * This routine is executed with the wait queue lock held.
+ * Since kick_iocb acquires iocb->ctx->ctx_lock, it nests
+ * the ioctx lock inside the wait queue lock. This is safe
+ * because this callback isn't used for wait queues which
+ * are nested inside ioctx lock (i.e. ctx->wait)
+ */
+static int aio_wake_function(wait_queue_t *wait, unsigned mode,
+ int sync, void *key)
+{
+ struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait);
+
+ list_del_init(&wait->task_list);
+ kick_iocb(iocb);
+ return 1;
+}
+
int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
struct iocb *iocb)
{
struct kiocb *req;
struct file *file;
ssize_t ret;
- char __user *buf;
/* enforce forwards compatibility on users */
if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 ||
}
req->ki_filp = file;
- iocb->aio_key = req->ki_key;
- ret = put_user(iocb->aio_key, &user_iocb->aio_key);
+ ret = put_user(req->ki_key, &user_iocb->aio_key);
if (unlikely(ret)) {
dprintk("EFAULT: aio_key\n");
goto out_put_req;
req->ki_user_data = iocb->aio_data;
req->ki_pos = iocb->aio_offset;
- buf = (char __user *)(unsigned long)iocb->aio_buf;
+ req->ki_buf = (char __user *)(unsigned long)iocb->aio_buf;
+ req->ki_left = req->ki_nbytes = iocb->aio_nbytes;
+ req->ki_opcode = iocb->aio_lio_opcode;
+ init_waitqueue_func_entry(&req->ki_wait, aio_wake_function);
+ INIT_LIST_HEAD(&req->ki_wait.task_list);
+ req->ki_retried = 0;
- switch (iocb->aio_lio_opcode) {
- case IOCB_CMD_PREAD:
- ret = -EBADF;
- if (unlikely(!(file->f_mode & FMODE_READ)))
- goto out_put_req;
- ret = -EFAULT;
- if (unlikely(!access_ok(VERIFY_WRITE, buf, iocb->aio_nbytes)))
- goto out_put_req;
- ret = security_file_permission (file, MAY_READ);
- if (ret)
- goto out_put_req;
- ret = -EINVAL;
- if (file->f_op->aio_read)
- ret = file->f_op->aio_read(req, buf,
- iocb->aio_nbytes, req->ki_pos);
- break;
- case IOCB_CMD_PWRITE:
- ret = -EBADF;
- if (unlikely(!(file->f_mode & FMODE_WRITE)))
- goto out_put_req;
- ret = -EFAULT;
- if (unlikely(!access_ok(VERIFY_READ, buf, iocb->aio_nbytes)))
- goto out_put_req;
- ret = security_file_permission (file, MAY_WRITE);
- if (ret)
- goto out_put_req;
- ret = -EINVAL;
- if (file->f_op->aio_write)
- ret = file->f_op->aio_write(req, buf,
- iocb->aio_nbytes, req->ki_pos);
- break;
- case IOCB_CMD_FDSYNC:
- ret = -EINVAL;
- if (file->f_op->aio_fsync)
- ret = file->f_op->aio_fsync(req, 1);
- break;
- case IOCB_CMD_FSYNC:
- ret = -EINVAL;
- if (file->f_op->aio_fsync)
- ret = file->f_op->aio_fsync(req, 0);
- break;
- default:
- dprintk("EINVAL: io_submit: no operation provided\n");
- ret = -EINVAL;
- }
+ ret = aio_setup_iocb(req);
+
+ if (ret)
+ goto out_put_req;
+ spin_lock_irq(&ctx->ctx_lock);
+ aio_run_iocb(req);
+ if (!list_empty(&ctx->run_list)) {
+ /* drain the run list */
+ while (__aio_run_iocbs(ctx))
+ ;
+ }
+ spin_unlock_irq(&ctx->ctx_lock);
aio_put_req(req); /* drop extra ref to req */
- if (likely(-EIOCBQUEUED == ret))
- return 0;
- aio_complete(req, ret, 0); /* will drop i/o ref to req */
return 0;
out_put_req:
/* lookup_kiocb
* Finds a given iocb for cancellation.
- * MUST be called with ctx->ctx_lock held.
*/
-struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb, u32 key)
+static struct kiocb *lookup_kiocb(struct kioctx *ctx, struct iocb __user *iocb,
+ u32 key)
{
struct list_head *pos;
+
+ assert_spin_locked(&ctx->ctx_lock);
+
/* TODO: use a hash or array, this sucks. */
list_for_each(pos, &ctx->active_reqs) {
struct kiocb *kiocb = list_kiocb(pos);
if (kiocb && kiocb->ki_cancel) {
cancel = kiocb->ki_cancel;
kiocb->ki_users ++;
+ kiocbSetCancelled(kiocb);
} else
cancel = NULL;
spin_unlock_irq(&ctx->ctx_lock);
ret = -EFAULT;
}
} else
- printk(KERN_DEBUG "iocb has no cancel operation\n");
+ ret = -EINVAL;
put_ioctx(ctx);