From: Suzuki <suzuki@in.ibm.com>

I found some races in the AIO call back path which is the root cause for a
few problems in the AIO code.  The race is between the aio call back path (
executed by softirqs ) and the aio_run_iocb().

(1) Suppose a retry returned -EIOCBRETRY and wait has been queued.

Now, whenever the wait event has been completed, the aio_wake_function() is
invoked by the softirqs.  aio_wake_function deletes the wait entry
(iocb->ki_wait.task_list) and invokes kick_iocb() to kick the iocb and
queue it back to the run_list.

Consider a situation like:

SOFTIRQ:
	list_del_init(&wait->task_list);
           ## SOFTIRQ gets scheduled or PROCESS executes on another CPU ##
	kick_iocb(iocb);
PROCESS:
	/*
	 * Issue an additional retry to avoid waiting forever if
	 * no waits were queued (e.g. in case of a short read).
	 */
	if (list_empty(&iocb->ki_wait.task_list))
	kiocbSetKicked(iocb);

So, PROCESS might see the deleted(empty) wait list
(iocb->ki_wait.task_list) and will kick and queue it up and it will be
retried again.  Now if the next retry is going to happen before SOFTIRQ
gets a chance to run, we are running into problems.

There are two possiblities:

(a) The iocb may get completed.

    If this happens before the SOFTIRQ enters kick_iocb(iocb), we are
    about to kick an iocb which is no more valid.  This might cause a
    Kernel Oops while trying,

	spin_lock_irqsave(&ctx->ctx_lock, flags);

    since the iocb->ki_ctx may be invalid.

(b) The iocb might encounter another wait condition.

    This case iocb would be queued for some events.  This would cause
    the SOFTIRQ hit

	WARN_ON((!list_empty(&iocb->ki_wait.task_list)));

    in queue_kicked_iocb().

(2) Similar races occur due to Kicking the iocb.

i.e,
SOFTIRQ: [in kick_iocb()]
		if(!kiocbTryKick(iocb)) {
	# If we get scheduled here. And PROCESS reaches as below #
			queue_kicked_iocb(iocb);
PROCESS: [ in aio_run_iocb() ]
		/* will make __queue_kicked_iocb succeed from here on */
		INIT_LIST_HEAD(&iocb->ki_run_list);
		/* we must queue the next iteration ourselves, if it
		 * has already been kicked */
		if (kiocbIsKicked(iocb)) { <- Will be true see ( kicked by kick_iocb)
			__queue_kicked_iocb(iocb);
		}

Thus iocb will get queued and may be retried.  This again can cause the
problems described above.

Conclusion
===========

From this I conclude that the following operations in call back path for
async iocbs should be atomic.

(*) Deletion of the wait queue entry.
(*) Kicking the iocb.
(*) Queue back to run_list. ( which is already atomic with ctx_lock held ).

Also the check for waits being queued in aio_run_iocb() should be done with
the ctx_lock held.

Solution
========

I have created a patch which makes above operations to be done with the
iocb->ki_ctx->ctx_lock held.  Also, I have moved the check :

   > if (list_empty(&iocb->ki_wait.task_list))
   >	kiocbSetKicked(iocb);

to be done under the ctx_lock held.

Since we already holds a lock before calling queue_kicked_iocb(), there is
no need for a queue_kicked_iocb anymore.  So I have renamed the
__queue_kicked_iocb to queue_kicked_iocb.

Testing
========

I tested the proposed patch in 2.6.13-rc3-mm1 on a 2-way Intel Xeon(with
HT) with aio-stress and fsx-linux test cases.  The tests ran fine.

Cc: Suparna Bhattacharya <suparna@in.ibm.com>
Cc: Benjamin LaHaise <bcrl@kvack.org>
Signed-off-by: Andrew Morton <akpm@osdl.org>
---

 fs/aio.c |   54 ++++++++++++++++++++++++++++++------------------------
 1 files changed, 30 insertions(+), 24 deletions(-)

diff -puN fs/aio.c~aio-fix-races-in-callback-path fs/aio.c
--- devel/fs/aio.c~aio-fix-races-in-callback-path	2005-07-27 12:43:19.000000000 -0700
+++ devel-akpm/fs/aio.c	2005-07-27 12:43:19.000000000 -0700
@@ -608,7 +608,7 @@ static void unuse_mm(struct mm_struct *m
  * Should be called with the spin lock iocb->ki_ctx->ctx_lock
  * held
  */
-static inline int __queue_kicked_iocb(struct kiocb *iocb)
+static inline int queue_kicked_iocb(struct kiocb *iocb)
 {
 	struct kioctx *ctx = iocb->ki_ctx;
 
@@ -723,13 +723,6 @@ static ssize_t aio_run_iocb(struct kiocb
 			aio_complete(iocb, ret, 0);
 			/* must not access the iocb after this */
 		}
-	} else {
-		/*
-		 * Issue an additional retry to avoid waiting forever if
-		 * no waits were queued (e.g. in case of a short read).
-		 */
-		if (list_empty(&iocb->ki_wait.task_list))
-			kiocbSetKicked(iocb);
 	}
 out:
 	spin_lock_irq(&ctx->ctx_lock);
@@ -740,17 +733,23 @@ out:
 		 * and know that there is more left to go,
 		 * this is where we let go so that a subsequent
 		 * "kick" can start the next iteration
+		 *
+		 * Issue an additional retry to avoid waiting forever if
+		 * no waits were queued (e.g. in case of a short read).
+		 * (Should be done with ctx_lock held.)
 		 */
 
-		/* will make __queue_kicked_iocb succeed from here on */
+		if (list_empty(&iocb->ki_wait.task_list))
+			kiocbSetKicked(iocb);
+		/* will make queue_kicked_iocb succeed from here on */
 		INIT_LIST_HEAD(&iocb->ki_run_list);
 		/* we must queue the next iteration ourselves, if it
 		 * has already been kicked */
 		if (kiocbIsKicked(iocb)) {
-			__queue_kicked_iocb(iocb);
+			queue_kicked_iocb(iocb);
 
 			/*
-			 * __queue_kicked_iocb will always return 1 here, because
+			 * queue_kicked_iocb will always return 1 here, because
 			 * iocb->ki_run_list is empty at this point so it should
 			 * be safe to unconditionally queue the context into the
 			 * work queue.
@@ -869,21 +868,31 @@ static void aio_kick_handler(void *data)
 
 
 /*
- * Called by kick_iocb to queue the kiocb for retry
- * and if required activate the aio work queue to process
- * it
+ * Kicking an async iocb.
+ * The following operations for the async iocbs should be atomic
+ * to avoid races with the aio_run_iocb() code.
+ * (1) Deleting the wait queue entry.
+ * (2) Kicking the iocb.
+ * (3) Queue the iocb back to run_list.
+ * Holds the ctx->ctx_lock to avoid races.
  */
-static void queue_kicked_iocb(struct kiocb *iocb)
+static void kick_async_iocb(struct kiocb *iocb)
 {
  	struct kioctx	*ctx = iocb->ki_ctx;
 	unsigned long flags;
 	int run = 0;
 
-	WARN_ON((!list_empty(&iocb->ki_wait.task_list)));
-
 	spin_lock_irqsave(&ctx->ctx_lock, flags);
-	run = __queue_kicked_iocb(iocb);
+	list_del_init(&iocb->ki_wait.task_list);
+	/* If its already kicked we shouldn't queue it again */
+	if (!kiocbTryKick(iocb)) {
+		run = queue_kicked_iocb(iocb);
+	}
 	spin_unlock_irqrestore(&ctx->ctx_lock, flags);
+
+	/* Activate the aio worker queue if we have successfully queued
+	 * the iocb, so that it can be processed
+	 */
 	if (run)
 		aio_queue_work(ctx);
 }
@@ -900,15 +909,13 @@ void fastcall kick_iocb(struct kiocb *io
 	/* sync iocbs are easy: they can only ever be executing from a 
 	 * single context. */
 	if (is_sync_kiocb(iocb)) {
+		list_del_init(&iocb->ki_wait.task_list);
 		kiocbSetKicked(iocb);
 	        wake_up_process(iocb->ki_obj.tsk);
 		return;
-	}
+	} else
+		kick_async_iocb(iocb);
 
-	/* If its already kicked we shouldn't queue it again */
-	if (!kiocbTryKick(iocb)) {
-		queue_kicked_iocb(iocb);
-	}
 }
 EXPORT_SYMBOL(kick_iocb);
 
@@ -1460,7 +1467,6 @@ static int aio_wake_function(wait_queue_
 {
 	struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait);
 
-	list_del_init(&wait->task_list);
 	kick_iocb(iocb);
 	return 1;
 }
_