Async buffer wait
DESC
lock_buffer_wq fix
EDESC
From: Suparna Bhattacharya, Hugh Dickins.

Actually lock the buffer.


 fs/aio.c                    |    4 +++-
 fs/buffer.c                 |   33 ++++++++++++++++++++++++++++-----
 include/linux/buffer_head.h |   27 +++++++++++++++++++++++----
 include/linux/pagemap.h     |   35 ++++++++++++++++++++---------------
 mm/filemap.c                |    7 -------
 5 files changed, 74 insertions(+), 32 deletions(-)

diff -puN fs/buffer.c~aio-04-buffer_wq fs/buffer.c
--- 25/fs/buffer.c~aio-04-buffer_wq	2004-01-05 18:58:01.000000000 -0800
+++ 25-akpm/fs/buffer.c	2004-01-05 18:58:01.000000000 -0800
@@ -116,27 +116,50 @@ void unlock_buffer(struct buffer_head *b
 }
 
 /*
- * Block until a buffer comes unlocked.  This doesn't stop it
+ * Wait until a buffer comes unlocked.  This doesn't stop it
  * from becoming locked again - you have to lock it yourself
  * if you want to preserve its state.
+ * If the wait queue parameter specifies an async i/o callback,
+ * then instead of blocking, we just queue up the callback
+ * on the wait queue for async notification when the buffer gets
+ * unlocked.
+ * A NULL wait queue parameter defaults to synchronous behaviour
  */
-void __wait_on_buffer(struct buffer_head * bh)
+int __wait_on_buffer_wq(struct buffer_head * bh, wait_queue_t *wait)
 {
 	wait_queue_head_t *wqh = bh_waitq_head(bh);
-	DEFINE_WAIT(wait);
+	DEFINE_WAIT(local_wait);
+
+	if (!wait)
+		wait = &local_wait;
 
 	if (atomic_read(&bh->b_count) == 0 &&
 			(!bh->b_page || !PageLocked(bh->b_page)))
 		buffer_error();
 
 	do {
-		prepare_to_wait(wqh, &wait, TASK_UNINTERRUPTIBLE);
+		prepare_to_wait(wqh, wait, TASK_UNINTERRUPTIBLE);
 		if (buffer_locked(bh)) {
 			blk_run_queues();
+			if (!is_sync_wait(wait)) {
+				/*
+				 * if we've queued an async wait queue
+				 * callback do not block; just tell the
+				 * caller to return and retry later when
+				 * the callback is notified
+				 */
+				return -EIOCBRETRY;
+			}
 			io_schedule();
 		}
 	} while (buffer_locked(bh));
-	finish_wait(wqh, &wait);
+	finish_wait(wqh, wait);
+	return 0;
+}
+
+void __wait_on_buffer(struct buffer_head * bh)
+{
+	__wait_on_buffer_wq(bh, NULL);
 }
 
 static void
diff -puN include/linux/buffer_head.h~aio-04-buffer_wq include/linux/buffer_head.h
--- 25/include/linux/buffer_head.h~aio-04-buffer_wq	2004-01-05 18:58:01.000000000 -0800
+++ 25-akpm/include/linux/buffer_head.h	2004-01-05 18:58:01.000000000 -0800
@@ -162,6 +162,7 @@ void mark_buffer_async_write(struct buff
 void invalidate_bdev(struct block_device *, int);
 int sync_blockdev(struct block_device *bdev);
 void __wait_on_buffer(struct buffer_head *);
+int __wait_on_buffer_wq(struct buffer_head *, wait_queue_t *wait);
 wait_queue_head_t *bh_waitq_head(struct buffer_head *bh);
 void wake_up_buffer(struct buffer_head *bh);
 int fsync_bdev(struct block_device *);
@@ -277,16 +278,34 @@ map_bh(struct buffer_head *bh, struct su
  * __wait_on_buffer() just to trip a debug check.  Because debug code in inline
  * functions is bloaty.
  */
-static inline void wait_on_buffer(struct buffer_head *bh)
+
+static inline int wait_on_buffer_wq(struct buffer_head *bh, wait_queue_t *wait)
 {
 	if (buffer_locked(bh) || atomic_read(&bh->b_count) == 0)
-		__wait_on_buffer(bh);
+		return __wait_on_buffer_wq(bh, wait);
+
+	return 0;
+}
+
+static inline void wait_on_buffer(struct buffer_head *bh)
+{
+	wait_on_buffer_wq(bh, NULL);
+}
+
+static inline int lock_buffer_wq(struct buffer_head *bh, wait_queue_t *wait)
+{
+	while (test_set_buffer_locked(bh)) {
+		int ret = __wait_on_buffer_wq(bh, wait);
+		if (ret)
+			return ret;
+	}
+
+	return 0;
 }
 
 static inline void lock_buffer(struct buffer_head *bh)
 {
-	while (test_set_buffer_locked(bh))
-		__wait_on_buffer(bh);
+	lock_buffer_wq(bh, NULL);
 }
 
 #endif /* _LINUX_BUFFER_HEAD_H */
diff -puN fs/aio.c~aio-04-buffer_wq fs/aio.c
--- 25/fs/aio.c~aio-04-buffer_wq	2004-01-05 18:58:01.000000000 -0800
+++ 25-akpm/fs/aio.c	2004-01-05 18:58:01.000000000 -0800
@@ -50,6 +50,7 @@ static kmem_cache_t	*kiocb_cachep;
 static kmem_cache_t	*kioctx_cachep;
 
 static struct workqueue_struct *aio_wq;
+static struct workqueue_struct *aio_fput_wq;
 
 /* Used for rare fput completion. */
 static void aio_fput_routine(void *);
@@ -77,6 +78,7 @@ static int __init aio_setup(void)
 		panic("unable to create kioctx cache");
 
 	aio_wq = create_workqueue("aio");
+	aio_fput_wq = create_workqueue("aio_fput");
 
 	pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
 
@@ -509,7 +511,7 @@ static int __aio_put_req(struct kioctx *
 		spin_lock(&fput_lock);
 		list_add(&req->ki_list, &fput_head);
 		spin_unlock(&fput_lock);
-		queue_work(aio_wq, &fput_work);
+		queue_work(aio_fput_wq, &fput_work);
 	} else
 		really_put_req(ctx, req);
 	return 1;
diff -puN include/linux/pagemap.h~aio-04-buffer_wq include/linux/pagemap.h
--- 25/include/linux/pagemap.h~aio-04-buffer_wq	2004-01-05 18:58:01.000000000 -0800
+++ 25-akpm/include/linux/pagemap.h	2004-01-05 18:58:01.000000000 -0800
@@ -152,11 +152,6 @@ static inline void ___add_to_page_cache(
 extern void FASTCALL(__lock_page(struct page *page));
 extern void FASTCALL(unlock_page(struct page *page));
 
-static inline void lock_page(struct page *page)
-{
-	if (TestSetPageLocked(page))
-		__lock_page(page);
-}
 
 extern int FASTCALL(__lock_page_wq(struct page *page, wait_queue_t *wait));
 static inline int lock_page_wq(struct page *page, wait_queue_t *wait)
@@ -167,12 +162,17 @@ static inline int lock_page_wq(struct pa
 		return 0;
 }
 
+static inline void lock_page(struct page *page)
+{
+	lock_page_wq(page, NULL);
+}
 	
 /*
  * This is exported only for wait_on_page_locked/wait_on_page_writeback.
  * Never use this directly!
  */
-extern void FASTCALL(wait_on_page_bit(struct page *page, int bit_nr));
+extern int FASTCALL(wait_on_page_bit_wq(struct page *page, int bit_nr,
+	wait_queue_t *wait));
 
 /* 
  * Wait for a page to be unlocked.
@@ -181,28 +181,33 @@ extern void FASTCALL(wait_on_page_bit(st
  * ie with increased "page->count" so that the page won't
  * go away during the wait..
  */
-static inline void wait_on_page_locked(struct page *page)
+static inline int wait_on_page_locked_wq(struct page *page, wait_queue_t *wait)
 {
 	if (PageLocked(page))
-		wait_on_page_bit(page, PG_locked);
+		return wait_on_page_bit_wq(page, PG_locked, wait);
+	return 0;
 }
 
-extern int FASTCALL(wait_on_page_bit_wq(struct page *page, int bit_nr,
-	wait_queue_t *wait));
-static inline int wait_on_page_locked_wq(struct page *page, wait_queue_t *wait)
+static inline int wait_on_page_writeback_wq(struct page *page,
+						wait_queue_t *wait)
 {
-	if (PageLocked(page))
-		return wait_on_page_bit_wq(page, PG_locked, wait);
+	if (PageWriteback(page))
+		return wait_on_page_bit_wq(page, PG_writeback, wait);
 	return 0;
 }
 
+static inline void wait_on_page_locked(struct page *page)
+{
+	wait_on_page_locked_wq(page, NULL);
+}
+
 /* 
  * Wait for a page to complete writeback
  */
+
 static inline void wait_on_page_writeback(struct page *page)
 {
-	if (PageWriteback(page))
-		wait_on_page_bit(page, PG_writeback);
+	wait_on_page_writeback_wq(page, NULL);
 }
 
 extern void end_page_writeback(struct page *page);
diff -puN mm/filemap.c~aio-04-buffer_wq mm/filemap.c
--- 25/mm/filemap.c~aio-04-buffer_wq	2004-01-05 18:58:01.000000000 -0800
+++ 25-akpm/mm/filemap.c	2004-01-05 18:58:01.000000000 -0800
@@ -345,13 +345,6 @@ int wait_on_page_bit_wq(struct page *pag
 }
 EXPORT_SYMBOL(wait_on_page_bit_wq);
 
-void wait_on_page_bit(struct page *page, int bit_nr)
-{
-	wait_on_page_bit_wq(page, bit_nr, NULL);
-}
-
-EXPORT_SYMBOL(wait_on_page_bit);
-
 /**
  * unlock_page() - unlock a locked page
  *

_