From: Nick Piggin <nickpiggin@yahoo.com.au>

Move wakeups out from under the rwsem's wait_lock spinlock.

This reduces that lock's contention by a factor of around 10 on the NUMAQ
running volanomark, however cacheline contention on the rwsem's "activity"
drowns out these small improvements when using the i386 "optimised" rwsem:

unpatched:
55802519 total                                     32.3097
23325323 default_idle                             364458.1719
22084349 .text.lock.futex                         82404.2873
2369107 queue_me                                 24678.1979
1875296 unqueue_me                               9767.1667
1202258 .text.lock.rwsem                         46240.6923
941801 finish_task_switch                       7357.8203
787101 __wake_up                                12298.4531
645252 drop_key_refs                            13442.7500
362789 futex_wait                               839.7894
333294 futex_wake                               1487.9196
146797 rwsem_down_read_failed                   436.8958
 82788 .text.lock.dev                           221.3583
 81221 try_to_wake_up                           133.5872

+rwsem-scale:
58120260 total                                     33.6458
25482132 default_idle                             398158.3125
22774675 .text.lock.futex                         84980.1306
2517797 queue_me                                 26227.0521
1953424 unqueue_me                               10174.0833
1063068 finish_task_switch                       8305.2188
834793 __wake_up                                13043.6406
674570 drop_key_refs                            14053.5417
371811 futex_wait                               860.6736
343398 futex_wake                               1533.0268
155419 try_to_wake_up                           255.6234
114704 .text.lock.rwsem                         4411.6923

The rwsem-spinlock implementation, however, is improved significantly more,
and gets volanomark performance similar to the optimised rwsem.

Although most users of the generic implementation probably aren't highly
parallel systems, it appears this is the only implementation capable of
protecting a writer from more than 32 768 readers, so it might becomore
more relevant.

unpatched:
30850964 total                                     18.1787
18986006 default_idle                             296656.3438
3989183 .text.lock.rwsem_spinlock                40294.7778
2990161 .text.lock.futex                         32501.7500
549707 finish_task_switch                       4294.5859
535327 __down_read                              3717.5486
452721 queue_me                                 4715.8438
439725 __up_read                                9160.9375
396273 __wake_up                                6191.7656
326595 unqueue_me                               1701.0156

+rwsem-scale:
25378268 total                                     14.9537
13325514 default_idle                             208211.1562
3675634 .text.lock.futex                         39952.5435
2908629 .text.lock.rwsem_spinlock                28239.1165
628115 __down_read                              4361.9097
607417 finish_task_switch                       4745.4453
588031 queue_me                                 6125.3229
571169 __up_read                                11899.3542
436795 __wake_up                                6824.9219
416788 unqueue_me                               2170.7708



---

 25-akpm/lib/rwsem-spinlock.c |   35 +++++++++++++++++++++--------------
 25-akpm/lib/rwsem.c          |   40 +++++++++++++++++++---------------------
 2 files changed, 40 insertions(+), 35 deletions(-)

diff -puN lib/rwsem.c~scale-rwsem-take-2 lib/rwsem.c
--- 25/lib/rwsem.c~scale-rwsem-take-2	2004-04-14 19:39:34.188290888 -0700
+++ 25-akpm/lib/rwsem.c	2004-04-14 19:39:34.194289976 -0700
@@ -42,14 +42,16 @@ void rwsemtrace(struct rw_semaphore *sem
  * - woken process blocks are discarded from the list after having flags
  *   zeroised
  * - writers are only woken if wakewrite is non-zero
+ *
+ * The spinlock will be dropped by this function.
  */
 static inline struct rw_semaphore *
 __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
 {
+	LIST_HEAD(wake_list);
 	struct rwsem_waiter *waiter;
-	struct list_head *next;
 	signed long oldcount;
-	int woken, loop;
+	int woken;
 
 	rwsemtrace(sem, "Entering __rwsem_do_wake");
 
@@ -73,8 +75,7 @@ try_again:
 	if (!(waiter->flags & RWSEM_WAITING_FOR_WRITE))
 		goto readers_only;
 
-	list_del(&waiter->list);
-	complete(&waiter->granted);
+	list_move_tail(&waiter->list, &wake_list);
 	goto out;
 
 	/* don't want to wake any writers */
@@ -91,32 +92,29 @@ dont_wake_writers:
 readers_only:
 	woken = 0;
 	do {
+		list_move_tail(&waiter->list, &wake_list);
 		woken++;
 
-		if (waiter->list.next == &sem->wait_list)
+		if (list_empty(&sem->wait_list))
 			break;
 
-		waiter = list_entry(waiter->list.next,
-					struct rwsem_waiter, list);
+		waiter = list_entry(sem->wait_list.next,
+				struct rwsem_waiter, list);
 
 	} while (waiter->flags & RWSEM_WAITING_FOR_READ);
 
-	loop = woken;
 	woken *= RWSEM_ACTIVE_BIAS - RWSEM_WAITING_BIAS;
 	woken -= RWSEM_ACTIVE_BIAS;
 	rwsem_atomic_add(woken, sem);
 
-	next = sem->wait_list.next;
-	for (; loop > 0; loop--) {
-		waiter = list_entry(next, struct rwsem_waiter, list);
-		next = waiter->list.next;
+out:
+	spin_unlock(&sem->wait_lock);
+	while (!list_empty(&wake_list)) {
+		waiter = list_entry(wake_list.next, struct rwsem_waiter, list);
+		list_del(&waiter->list);
 		complete(&waiter->granted);
 	}
 
-	sem->wait_list.next = next;
-	next->prev = &sem->wait_list;
-
-out:
 	rwsemtrace(sem, "Leaving __rwsem_do_wake");
 	return sem;
 
@@ -138,10 +136,10 @@ rwsem_down_failed_common(struct rw_semap
 	signed long count;
 
 	/* set up my own style of waitqueue */
-	spin_lock(&sem->wait_lock);
 	waiter->task = tsk;
 	init_completion(&waiter->granted);
 
+	spin_lock(&sem->wait_lock);
 	list_add_tail(&waiter->list, &sem->wait_list);
 
 	/* note that we're now waiting on the lock, but no longer actively
@@ -154,8 +152,8 @@ rwsem_down_failed_common(struct rw_semap
 	 */
 	if (!(count & RWSEM_ACTIVE_MASK))
 		sem = __rwsem_do_wake(sem, 1);
-
-	spin_unlock(&sem->wait_lock);
+	else
+		spin_unlock(&sem->wait_lock);
 
 	/* wait to be given the lock */
 	wait_for_completion(&waiter->granted);
@@ -211,8 +209,8 @@ struct rw_semaphore fastcall *rwsem_wake
 	/* do nothing if list empty */
 	if (!list_empty(&sem->wait_list))
 		sem = __rwsem_do_wake(sem, 1);
-
-	spin_unlock(&sem->wait_lock);
+	else
+		spin_unlock(&sem->wait_lock);
 
 	rwsemtrace(sem, "Leaving rwsem_wake");
 
diff -puN lib/rwsem-spinlock.c~scale-rwsem-take-2 lib/rwsem-spinlock.c
--- 25/lib/rwsem-spinlock.c~scale-rwsem-take-2	2004-04-14 19:39:34.189290736 -0700
+++ 25-akpm/lib/rwsem-spinlock.c	2004-04-14 19:39:34.195289824 -0700
@@ -55,6 +55,7 @@ void fastcall init_rwsem(struct rw_semap
 static inline struct rw_semaphore *
 __rwsem_do_wake(struct rw_semaphore *sem, int wakewrite)
 {
+	LIST_HEAD(wake_list);
 	struct rwsem_waiter *waiter;
 	int woken;
 
@@ -74,8 +75,7 @@ __rwsem_do_wake(struct rw_semaphore *sem
 	 */
 	if (waiter->flags & RWSEM_WAITING_FOR_WRITE) {
 		sem->activity = -1;
-		list_del(&waiter->list);
-		complete(&waiter->granted);
+		list_move_tail(&waiter->list, &wake_list);
 		goto out;
 	}
 
@@ -84,25 +84,31 @@ __rwsem_do_wake(struct rw_semaphore *sem
 dont_wake_writers:
 	woken = 0;
 	while (waiter->flags & RWSEM_WAITING_FOR_READ) {
-		struct list_head *next = waiter->list.next;
-
-		list_del(&waiter->list);
-		complete(&waiter->granted);
+		list_move_tail(&waiter->list, &wake_list);
 		woken++;
 		if (list_empty(&sem->wait_list))
 			break;
-		waiter = list_entry(next, struct rwsem_waiter, list);
+		waiter = list_entry(sem->wait_list.next,
+					struct rwsem_waiter, list);
 	}
 
 	sem->activity += woken;
 
 out:
+	spin_unlock(&sem->wait_lock);
+	while (!list_empty(&wake_list)) {
+		waiter = list_entry(wake_list.next, struct rwsem_waiter, list);
+		list_del(&waiter->list);
+		complete(&waiter->granted);
+	}
+
 	rwsemtrace(sem, "Leaving __rwsem_do_wake");
 	return sem;
 }
 
 /*
- * wake a single writer
+ * wake a single writer.
+ * called with wait_lock locked and unlocks it in the process.
  */
 static inline struct rw_semaphore *
 __rwsem_wake_one_writer(struct rw_semaphore *sem)
@@ -113,6 +119,7 @@ __rwsem_wake_one_writer(struct rw_semaph
 
 	waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
 	list_del(&waiter->list);
+	spin_unlock(&sem->wait_lock);
 
 	complete(&waiter->granted);
 	return sem;
@@ -242,8 +249,8 @@ void fastcall __up_read(struct rw_semaph
 
 	if (--sem->activity == 0 && !list_empty(&sem->wait_list))
 		sem = __rwsem_wake_one_writer(sem);
-
-	spin_unlock(&sem->wait_lock);
+	else
+		spin_unlock(&sem->wait_lock);
 
 	rwsemtrace(sem, "Leaving __up_read");
 }
@@ -260,8 +267,8 @@ void fastcall __up_write(struct rw_semap
 	sem->activity = 0;
 	if (!list_empty(&sem->wait_list))
 		sem = __rwsem_do_wake(sem, 1);
-
-	spin_unlock(&sem->wait_lock);
+	else
+		spin_unlock(&sem->wait_lock);
 
 	rwsemtrace(sem, "Leaving __up_write");
 }
@@ -279,8 +286,8 @@ void fastcall __downgrade_write(struct r
 	sem->activity = 1;
 	if (!list_empty(&sem->wait_list))
 		sem = __rwsem_do_wake(sem, 0);
-
-	spin_unlock(&sem->wait_lock);
+	else
+		spin_unlock(&sem->wait_lock);
 
 	rwsemtrace(sem, "Leaving __downgrade_write");
 }

_