From: Jamie Lokier <jamie@shareable.org>

I had an inkling when I sent out the recent futex hashed-lock fix that
there was a race condition, but I was too tired to be sure or to work
out a solution.

This patch fixes it.  In unqueue_me(), if certain events occur in
another task we take the wrong spinlock.  I realised that yesterday, but
proving that the fix works to my satisfaction required a state of higher
consciousness not normally attained until 5am (and soon lost...).

The form of the fix is inspired by an idea from Rusty.

Please folks, take a look at the fix in unqueue_me().  It looks simple
but the proof isn't obvious.

Oh, and some comment formatting requested by Andrew Morton. :)



 kernel/futex.c |   77 +++++++++++++++++++++++++++++++++++++++------------------
 1 files changed, 53 insertions(+), 24 deletions(-)

diff -puN kernel/futex.c~futex-locking-fix-fix kernel/futex.c
--- 25/kernel/futex.c~futex-locking-fix-fix	2003-10-03 01:38:04.000000000 -0700
+++ 25-akpm/kernel/futex.c	2003-10-03 01:38:04.000000000 -0700
@@ -256,18 +256,24 @@ static inline void drop_key_refs(union f
 	}
 }
 
-/* The hash bucket lock must be held when this is called.
-   Afterwards, the futex_q must not be accessed. */
+/*
+ * The hash bucket lock must be held when this is called.
+ * Afterwards, the futex_q must not be accessed.
+ */
 static inline void wake_futex(struct futex_q *q)
 {
 	list_del_init(&q->list);
 	if (q->filp)
 		send_sigio(&q->filp->f_owner, q->fd, POLL_IN);
-	/* The lock in wake_up_all() is a crucial memory barrier after the
-	   list_del_init() and also before assigning to q->lock_ptr. */
+	/*
+	 * The lock in wake_up_all() is a crucial memory barrier after the
+	 * list_del_init() and also before assigning to q->lock_ptr.
+	 */
 	wake_up_all(&q->waiters);
-	/* The waiting task can free the futex_q as soon as this is written,
-	   without taking any locks.  This must come last. */
+	/*
+	 * The waiting task can free the futex_q as soon as this is written,
+	 * without taking any locks.  This must come last.
+	 */
 	q->lock_ptr = 0;
 }
 
@@ -397,15 +403,34 @@ static inline void queue_me(struct futex
 }
 
 /* Return 1 if we were still queued (ie. 0 means we were woken) */
-static inline int unqueue_me(struct futex_q *q)
+static int unqueue_me(struct futex_q *q)
 {
 	int ret = 0;
-	spinlock_t *lock_ptr = q->lock_ptr;
+	spinlock_t *lock_ptr;
 
 	/* In the common case we don't take the spinlock, which is nice. */
+ retry:
+	lock_ptr = q->lock_ptr;
 	if (lock_ptr != 0) {
 		spin_lock(lock_ptr);
-		if (!list_empty(&q->list)) {
+		/*
+		 * q->lock_ptr can change between reading it and
+		 * spin_lock(), causing us to take the wrong lock.  This
+		 * corrects the race condition.
+		 *
+		 * Reasoning goes like this: if we have the wrong lock,
+		 * q->lock_ptr must have changed (maybe several times)
+		 * between reading it and the spin_lock().  It can
+		 * change again after the spin_lock() but only if it was
+		 * already changed before the spin_lock().  It cannot,
+		 * however, change back to the original value.  Therefore
+		 * we can detect whether we acquired the correct lock.
+		 */
+		if (unlikely(lock_ptr != q->lock_ptr)) {
+			spin_unlock(lock_ptr);
+			goto retry;
+		}
+		if (likely(!list_empty(&q->list))) {
 			list_del(&q->list);
 			ret = 1;
 		}
@@ -462,8 +487,10 @@ static int futex_wait(unsigned long uadd
 	/* add_wait_queue is the barrier after __set_current_state. */
 	__set_current_state(TASK_INTERRUPTIBLE);
 	add_wait_queue(&q.waiters, &wait);
-	/* !list_empty() is safe here without any lock.
-	   q.lock_ptr != 0 is not safe, because of ordering against wakeup. */
+	/*
+	 * !list_empty() is safe here without any lock.
+	 * q.lock_ptr != 0 is not safe, because of ordering against wakeup.
+	 */
 	if (likely(!list_empty(&q.list)))
 		time = schedule_timeout(time);
 	__set_current_state(TASK_RUNNING);
@@ -505,18 +532,16 @@ static unsigned int futex_poll(struct fi
 			       struct poll_table_struct *wait)
 {
 	struct futex_q *q = filp->private_data;
-	spinlock_t *lock_ptr;
-	int ret = POLLIN | POLLRDNORM;
+	int ret = 0;
 
 	poll_wait(filp, &q->waiters, wait);
 
-	lock_ptr = q->lock_ptr;
-	if (lock_ptr != 0) {
-		spin_lock(lock_ptr);
-		if (!list_empty(&q->list))
-			ret = 0;
-		spin_unlock(lock_ptr);
-	}
+	/*
+	 * list_empty() is safe here without any lock.
+	 * q->lock_ptr != 0 is not safe, because of ordering against wakeup.
+	 */
+	if (list_empty(&q->list))
+		ret = POLLIN | POLLRDNORM;
 
 	return ret;
 }
@@ -526,8 +551,10 @@ static struct file_operations futex_fops
 	.poll		= futex_poll,
 };
 
-/* Signal allows caller to avoid the race which would occur if they
-   set the sigio stuff up afterwards. */
+/*
+ * Signal allows caller to avoid the race which would occur if they
+ * set the sigio stuff up afterwards.
+ */
 static int futex_fd(unsigned long uaddr, int signal)
 {
 	struct futex_q *q;
@@ -583,8 +610,10 @@ static int futex_fd(unsigned long uaddr,
 		return err;
 	}
 
-	/* queue_me() must be called before releasing mmap_sem, because
-	   key->shared.inode needs to be referenced while holding it. */
+	/*
+	 * queue_me() must be called before releasing mmap_sem, because
+	 * key->shared.inode needs to be referenced while holding it.
+	 */
 	filp->private_data = q;
 
 	queue_me(q, ret, filp);

_