From: Manfred Spraul <manfred@colorfullife.com>

Step two for reducing cacheline trashing within rcupdate.c:

rcu_process_callbacks always acquires rcu_ctrlblk.state.mutex and calls
rcu_start_batch, even if the batch is already running or already scheduled to
run.

This can be avoided with a sequence lock: A sequence lock allows to read the
current batch number and next_pending atomically.  If next_pending is already
set, then there is no need to acquire the global mutex.

This means that for each grace period, there will be

- one write access to the rcu_ctrlblk.batch cacheline

- lots of read accesses to rcu_ctrlblk.batch (3-10*cpus_online()).  Behavior
  similar to the jiffies cacheline, shouldn't be a problem.

- cpus_online()+1 write accesses to rcu_ctrlblk.state, all of them starting
  with spin_lock(&rcu_ctrlblk.state.mutex).

  For large enough cpus_online() this will be a problem, but all except two
  of the spin_lock calls only protect the rcu_cpu_mask bitmap, thus a
  hierarchical bitmap would allow to split the write accesses to multiple
  cachelines.

Tested on an 8-way with reaim.  Unfortunately it probably won't help with Jack
Steiner's 'ls' test since in this test only one cpu generates rcu entries.

Signed-off-by: Manfred Spraul <manfred@colorfullife.com>
Signed-off-by:  Andrew Morton <akpm@osdl.org>
---

 25-akpm/include/linux/rcupdate.h |    6 +++++-
 25-akpm/kernel/rcupdate.c        |   29 +++++++++++++++++++++--------
 2 files changed, 26 insertions(+), 9 deletions(-)

diff -puN include/linux/rcupdate.h~rcu-lock-update-use-a-sequence-lock-for-starting-batches include/linux/rcupdate.h
--- 25/include/linux/rcupdate.h~rcu-lock-update-use-a-sequence-lock-for-starting-batches	Wed May 26 15:56:11 2004
+++ 25-akpm/include/linux/rcupdate.h	Wed May 26 15:56:11 2004
@@ -41,6 +41,7 @@
 #include <linux/threads.h>
 #include <linux/percpu.h>
 #include <linux/cpumask.h>
+#include <linux/seqlock.h>
 
 /**
  * struct rcu_head - callback structure for use with RCU
@@ -69,11 +70,14 @@ struct rcu_ctrlblk {
 	struct {
 		long	cur;		/* Current batch number.	      */
 		long	completed;	/* Number of the last completed batch */
+		int	next_pending;	/* Is the next batch already waiting? */
+		seqcount_t lock;	/* for atomically reading cur and     */
+		                        /* next_pending. Spinlock not used,   */
+		                        /* protected by state.mutex           */
 	} batch ____cacheline_maxaligned_in_smp;
 	/* remaining members: bookkeeping of the progress of the grace period */
 	struct {
 		spinlock_t	mutex;	/* Guard this struct                  */
-		int	next_pending;	/* Is the next batch already waiting? */
 		cpumask_t	rcu_cpu_mask; 	/* CPUs that need to switch   */
 				/* in order for current batch to proceed.     */
 	} state ____cacheline_maxaligned_in_smp;
diff -puN kernel/rcupdate.c~rcu-lock-update-use-a-sequence-lock-for-starting-batches kernel/rcupdate.c
--- 25/kernel/rcupdate.c~rcu-lock-update-use-a-sequence-lock-for-starting-batches	Wed May 26 15:56:11 2004
+++ 25-akpm/kernel/rcupdate.c	Wed May 26 15:56:11 2004
@@ -47,7 +47,7 @@
 
 /* Definition for rcupdate control block. */
 struct rcu_ctrlblk rcu_ctrlblk = 
-	{ .batch = { .cur = -300, .completed = -300 },
+	{ .batch = { .cur = -300, .completed = -300 , .lock = SEQCNT_ZERO },
 	  .state = {.mutex = SPIN_LOCK_UNLOCKED, .rcu_cpu_mask = CPU_MASK_NONE } };
 DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
 
@@ -124,16 +124,18 @@ static void rcu_start_batch(int next_pen
 	cpumask_t active;
 
 	if (next_pending)
-		rcu_ctrlblk.state.next_pending = 1;
+		rcu_ctrlblk.batch.next_pending = 1;
 
-	if (rcu_ctrlblk.state.next_pending &&
+	if (rcu_ctrlblk.batch.next_pending &&
 			rcu_ctrlblk.batch.completed == rcu_ctrlblk.batch.cur) {
-		rcu_ctrlblk.state.next_pending = 0;
 		/* Can't change, since spin lock held. */
 		active = nohz_cpu_mask;
 		cpus_complement(active);
 		cpus_and(rcu_ctrlblk.state.rcu_cpu_mask, cpu_online_map, active);
+		write_seqcount_begin(&rcu_ctrlblk.batch.lock);
+		rcu_ctrlblk.batch.next_pending = 0;
 		rcu_ctrlblk.batch.cur++;
+		write_seqcount_end(&rcu_ctrlblk.batch.lock);
 	}
 }
 
@@ -261,6 +263,8 @@ static void rcu_process_callbacks(unsign
 
 	local_irq_disable();
 	if (!list_empty(&RCU_nxtlist(cpu)) && list_empty(&RCU_curlist(cpu))) {
+		int next_pending, seq;
+
 		__list_splice(&RCU_nxtlist(cpu), &RCU_curlist(cpu));
 		INIT_LIST_HEAD(&RCU_nxtlist(cpu));
 		local_irq_enable();
@@ -268,10 +272,19 @@ static void rcu_process_callbacks(unsign
 		/*
 		 * start the next batch of callbacks
 		 */
-		spin_lock(&rcu_ctrlblk.state.mutex);
-		RCU_batch(cpu) = rcu_ctrlblk.batch.cur + 1;
-		rcu_start_batch(1);
-		spin_unlock(&rcu_ctrlblk.state.mutex);
+		do {
+			seq = read_seqcount_begin(&rcu_ctrlblk.batch.lock);
+			/* determine batch number */
+			RCU_batch(cpu) = rcu_ctrlblk.batch.cur + 1;
+			next_pending = rcu_ctrlblk.batch.next_pending;
+		} while (read_seqcount_retry(&rcu_ctrlblk.batch.lock, seq));
+
+		if (!next_pending) {
+			/* and start it/schedule start if it's a new batch */
+			spin_lock(&rcu_ctrlblk.state.mutex);
+			rcu_start_batch(1);
+			spin_unlock(&rcu_ctrlblk.state.mutex);
+		}
 	} else {
 		local_irq_enable();
 	}
_