From: Alasdair G Kergon <agk@redhat.com>

Add kcopyd daemon

Signed-off-by: Andrew Morton <akpm@osdl.org>
---

 25-akpm/drivers/md/Makefile |    2 
 25-akpm/drivers/md/dm.c     |    1 
 25-akpm/drivers/md/dm.h     |    3 
 25-akpm/drivers/md/kcopyd.c |  667 ++++++++++++++++++++++++++++++++++++++++++++
 25-akpm/drivers/md/kcopyd.h |   41 ++
 5 files changed, 713 insertions(+), 1 deletion(-)

diff -puN drivers/md/dm.c~2-5-device-mapper-kcopyd drivers/md/dm.c
--- 25/drivers/md/dm.c~2-5-device-mapper-kcopyd	Wed Jun  2 14:56:41 2004
+++ 25-akpm/drivers/md/dm.c	Wed Jun  2 14:56:41 2004
@@ -153,6 +153,7 @@ static struct {
 	xx(dm_target)
 	xx(dm_linear)
 	xx(dm_stripe)
+	xx(kcopyd)
 	xx(dm_interface)
 #undef xx
 };
diff -puN drivers/md/dm.h~2-5-device-mapper-kcopyd drivers/md/dm.h
--- 25/drivers/md/dm.h~2-5-device-mapper-kcopyd	Wed Jun  2 14:56:41 2004
+++ 25-akpm/drivers/md/dm.h	Wed Jun  2 14:56:41 2004
@@ -178,6 +178,9 @@ void dm_linear_exit(void);
 int dm_stripe_init(void);
 void dm_stripe_exit(void);
 
+int kcopyd_init(void);
+void kcopyd_exit(void);
+
 void *dm_vcalloc(unsigned long nmemb, unsigned long elem_size);
 
 #endif
diff -puN /dev/null drivers/md/kcopyd.c
--- /dev/null	Thu Apr 11 07:25:15 2002
+++ 25-akpm/drivers/md/kcopyd.c	Wed Jun  2 14:56:41 2004
@@ -0,0 +1,667 @@
+/*
+ * Copyright (C) 2002 Sistina Software (UK) Limited.
+ *
+ * This file is released under the GPL.
+ */
+
+#include <asm/atomic.h>
+
+#include <linux/blkdev.h>
+#include <linux/config.h>
+#include <linux/fs.h>
+#include <linux/init.h>
+#include <linux/list.h>
+#include <linux/mempool.h>
+#include <linux/module.h>
+#include <linux/pagemap.h>
+#include <linux/slab.h>
+#include <linux/vmalloc.h>
+#include <linux/workqueue.h>
+
+#include "kcopyd.h"
+
+/* FIXME: this is only needed for the DMERR macros */
+#include "dm.h"
+
+static struct workqueue_struct *_kcopyd_wq;
+static struct work_struct _kcopyd_work;
+
+static inline void wake(void)
+{
+	queue_work(_kcopyd_wq, &_kcopyd_work);
+}
+
+/*-----------------------------------------------------------------
+ * Each kcopyd client has its own little pool of preallocated
+ * pages for kcopyd io.
+ *---------------------------------------------------------------*/
+struct kcopyd_client {
+	struct list_head list;
+
+	spinlock_t lock;
+	struct page_list *pages;
+	unsigned int nr_pages;
+	unsigned int nr_free_pages;
+};
+
+static struct page_list *alloc_pl(void)
+{
+	struct page_list *pl;
+
+	pl = kmalloc(sizeof(*pl), GFP_KERNEL);
+	if (!pl)
+		return NULL;
+
+	pl->page = alloc_page(GFP_KERNEL);
+	if (!pl->page) {
+		kfree(pl);
+		return NULL;
+	}
+
+	SetPageLocked(pl->page);
+	return pl;
+}
+
+static void free_pl(struct page_list *pl)
+{
+	ClearPageLocked(pl->page);
+	__free_page(pl->page);
+	kfree(pl);
+}
+
+static int kcopyd_get_pages(struct kcopyd_client *kc,
+			    unsigned int nr, struct page_list **pages)
+{
+	struct page_list *pl;
+
+	spin_lock(&kc->lock);
+	if (kc->nr_free_pages < nr) {
+		spin_unlock(&kc->lock);
+		return -ENOMEM;
+	}
+
+	kc->nr_free_pages -= nr;
+	for (*pages = pl = kc->pages; --nr; pl = pl->next)
+		;
+
+	kc->pages = pl->next;
+	pl->next = 0;
+
+	spin_unlock(&kc->lock);
+
+	return 0;
+}
+
+static void kcopyd_put_pages(struct kcopyd_client *kc, struct page_list *pl)
+{
+	struct page_list *cursor;
+
+	spin_lock(&kc->lock);
+	for (cursor = pl; cursor->next; cursor = cursor->next)
+		kc->nr_free_pages++;
+
+	kc->nr_free_pages++;
+	cursor->next = kc->pages;
+	kc->pages = pl;
+	spin_unlock(&kc->lock);
+}
+
+/*
+ * These three functions resize the page pool.
+ */
+static void drop_pages(struct page_list *pl)
+{
+	struct page_list *next;
+
+	while (pl) {
+		next = pl->next;
+		free_pl(pl);
+		pl = next;
+	}
+}
+
+static int client_alloc_pages(struct kcopyd_client *kc, unsigned int nr)
+{
+	unsigned int i;
+	struct page_list *pl = NULL, *next;
+
+	for (i = 0; i < nr; i++) {
+		next = alloc_pl();
+		if (!next) {
+			if (pl)
+				drop_pages(pl);
+			return -ENOMEM;
+		}
+		next->next = pl;
+		pl = next;
+	}
+
+	kcopyd_put_pages(kc, pl);
+	kc->nr_pages += nr;
+	return 0;
+}
+
+static void client_free_pages(struct kcopyd_client *kc)
+{
+	BUG_ON(kc->nr_free_pages != kc->nr_pages);
+	drop_pages(kc->pages);
+	kc->pages = NULL;
+	kc->nr_free_pages = kc->nr_pages = 0;
+}
+
+/*-----------------------------------------------------------------
+ * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
+ * for this reason we use a mempool to prevent the client from
+ * ever having to do io (which could cause a deadlock).
+ *---------------------------------------------------------------*/
+struct kcopyd_job {
+	struct kcopyd_client *kc;
+	struct list_head list;
+	unsigned long flags;
+
+	/*
+	 * Error state of the job.
+	 */
+	int read_err;
+	unsigned int write_err;
+
+	/*
+	 * Either READ or WRITE
+	 */
+	int rw;
+	struct io_region source;
+
+	/*
+	 * The destinations for the transfer.
+	 */
+	unsigned int num_dests;
+	struct io_region dests[KCOPYD_MAX_REGIONS];
+
+	sector_t offset;
+	unsigned int nr_pages;
+	struct page_list *pages;
+
+	/*
+	 * Set this to ensure you are notified when the job has
+	 * completed.  'context' is for callback to use.
+	 */
+	kcopyd_notify_fn fn;
+	void *context;
+
+	/*
+	 * These fields are only used if the job has been split
+	 * into more manageable parts.
+	 */
+	struct semaphore lock;
+	atomic_t sub_jobs;
+	sector_t progress;
+};
+
+/* FIXME: this should scale with the number of pages */
+#define MIN_JOBS 512
+
+static kmem_cache_t *_job_cache;
+static mempool_t *_job_pool;
+
+/*
+ * We maintain three lists of jobs:
+ *
+ * i)   jobs waiting for pages
+ * ii)  jobs that have pages, and are waiting for the io to be issued.
+ * iii) jobs that have completed.
+ *
+ * All three of these are protected by job_lock.
+ */
+static spinlock_t _job_lock = SPIN_LOCK_UNLOCKED;
+
+static LIST_HEAD(_complete_jobs);
+static LIST_HEAD(_io_jobs);
+static LIST_HEAD(_pages_jobs);
+
+static int __init jobs_init(void)
+{
+	INIT_LIST_HEAD(&_complete_jobs);
+	INIT_LIST_HEAD(&_io_jobs);
+	INIT_LIST_HEAD(&_pages_jobs);
+
+	_job_cache = kmem_cache_create("kcopyd-jobs",
+				       sizeof(struct kcopyd_job),
+				       __alignof__(struct kcopyd_job),
+				       0, NULL, NULL);
+	if (!_job_cache)
+		return -ENOMEM;
+
+	_job_pool = mempool_create(MIN_JOBS, mempool_alloc_slab,
+				   mempool_free_slab, _job_cache);
+	if (!_job_pool) {
+		kmem_cache_destroy(_job_cache);
+		return -ENOMEM;
+	}
+
+	return 0;
+}
+
+static void jobs_exit(void)
+{
+	BUG_ON(!list_empty(&_complete_jobs));
+	BUG_ON(!list_empty(&_io_jobs));
+	BUG_ON(!list_empty(&_pages_jobs));
+
+	mempool_destroy(_job_pool);
+	kmem_cache_destroy(_job_cache);
+}
+
+/*
+ * Functions to push and pop a job onto the head of a given job
+ * list.
+ */
+static inline struct kcopyd_job *pop(struct list_head *jobs)
+{
+	struct kcopyd_job *job = NULL;
+	unsigned long flags;
+
+	spin_lock_irqsave(&_job_lock, flags);
+
+	if (!list_empty(jobs)) {
+		job = list_entry(jobs->next, struct kcopyd_job, list);
+		list_del(&job->list);
+	}
+	spin_unlock_irqrestore(&_job_lock, flags);
+
+	return job;
+}
+
+static inline void push(struct list_head *jobs, struct kcopyd_job *job)
+{
+	unsigned long flags;
+
+	spin_lock_irqsave(&_job_lock, flags);
+	list_add_tail(&job->list, jobs);
+	spin_unlock_irqrestore(&_job_lock, flags);
+}
+
+/*
+ * These three functions process 1 item from the corresponding
+ * job list.
+ *
+ * They return:
+ * < 0: error
+ *   0: success
+ * > 0: can't process yet.
+ */
+static int run_complete_job(struct kcopyd_job *job)
+{
+	void *context = job->context;
+	int read_err = job->read_err;
+	unsigned int write_err = job->write_err;
+	kcopyd_notify_fn fn = job->fn;
+
+	kcopyd_put_pages(job->kc, job->pages);
+	mempool_free(job, _job_pool);
+	fn(read_err, write_err, context);
+	return 0;
+}
+
+static void complete_io(unsigned long error, void *context)
+{
+	struct kcopyd_job *job = (struct kcopyd_job *) context;
+
+	if (error) {
+		if (job->rw == WRITE)
+			job->write_err &= error;
+		else
+			job->read_err = 1;
+
+		if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
+			push(&_complete_jobs, job);
+			wake();
+			return;
+		}
+	}
+
+	if (job->rw == WRITE)
+		push(&_complete_jobs, job);
+
+	else {
+		job->rw = WRITE;
+		push(&_io_jobs, job);
+	}
+
+	wake();
+}
+
+/*
+ * Request io on as many buffer heads as we can currently get for
+ * a particular job.
+ */
+static int run_io_job(struct kcopyd_job *job)
+{
+	int r;
+
+	if (job->rw == READ)
+		r = dm_io_async(1, &job->source, job->rw,
+				job->pages,
+				job->offset, complete_io, job);
+
+	else
+		r = dm_io_async(job->num_dests, job->dests, job->rw,
+				job->pages,
+				job->offset, complete_io, job);
+
+	return r;
+}
+
+static int run_pages_job(struct kcopyd_job *job)
+{
+	int r;
+
+	job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
+				  PAGE_SIZE >> 9);
+	r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
+	if (!r) {
+		/* this job is ready for io */
+		push(&_io_jobs, job);
+		return 0;
+	}
+
+	if (r == -ENOMEM)
+		/* can't complete now */
+		return 1;
+
+	return r;
+}
+
+/*
+ * Run through a list for as long as possible.  Returns the count
+ * of successful jobs.
+ */
+static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *))
+{
+	struct kcopyd_job *job;
+	int r, count = 0;
+
+	while ((job = pop(jobs))) {
+
+		r = fn(job);
+
+		if (r < 0) {
+			/* error this rogue job */
+			if (job->rw == WRITE)
+				job->write_err = (unsigned int) -1;
+			else
+				job->read_err = 1;
+			push(&_complete_jobs, job);
+			break;
+		}
+
+		if (r > 0) {
+			/*
+			 * We couldn't service this job ATM, so
+			 * push this job back onto the list.
+			 */
+			push(jobs, job);
+			break;
+		}
+
+		count++;
+	}
+
+	return count;
+}
+
+/*
+ * kcopyd does this every time it's woken up.
+ */
+static void do_work(void *ignored)
+{
+	/*
+	 * The order that these are called is *very* important.
+	 * complete jobs can free some pages for pages jobs.
+	 * Pages jobs when successful will jump onto the io jobs
+	 * list.  io jobs call wake when they complete and it all
+	 * starts again.
+	 */
+	process_jobs(&_complete_jobs, run_complete_job);
+	process_jobs(&_pages_jobs, run_pages_job);
+	process_jobs(&_io_jobs, run_io_job);
+}
+
+/*
+ * If we are copying a small region we just dispatch a single job
+ * to do the copy, otherwise the io has to be split up into many
+ * jobs.
+ */
+static void dispatch_job(struct kcopyd_job *job)
+{
+	push(&_pages_jobs, job);
+	wake();
+}
+
+#define SUB_JOB_SIZE 128
+static void segment_complete(int read_err,
+			     unsigned int write_err, void *context)
+{
+	/* FIXME: tidy this function */
+	sector_t progress = 0;
+	sector_t count = 0;
+	struct kcopyd_job *job = (struct kcopyd_job *) context;
+
+	down(&job->lock);
+
+	/* update the error */
+	if (read_err)
+		job->read_err = 1;
+
+	if (write_err)
+		job->write_err &= write_err;
+
+	/*
+	 * Only dispatch more work if there hasn't been an error.
+	 */
+	if ((!job->read_err && !job->write_err) ||
+	    test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
+		/* get the next chunk of work */
+		progress = job->progress;
+		count = job->source.count - progress;
+		if (count) {
+			if (count > SUB_JOB_SIZE)
+				count = SUB_JOB_SIZE;
+
+			job->progress += count;
+		}
+	}
+	up(&job->lock);
+
+	if (count) {
+		int i;
+		struct kcopyd_job *sub_job = mempool_alloc(_job_pool, GFP_NOIO);
+
+		memcpy(sub_job, job, sizeof(*job));
+		sub_job->source.sector += progress;
+		sub_job->source.count = count;
+
+		for (i = 0; i < job->num_dests; i++) {
+			sub_job->dests[i].sector += progress;
+			sub_job->dests[i].count = count;
+		}
+
+		sub_job->fn = segment_complete;
+		sub_job->context = job;
+		dispatch_job(sub_job);
+
+	} else if (atomic_dec_and_test(&job->sub_jobs)) {
+
+		/*
+		 * To avoid a race we must keep the job around
+		 * until after the notify function has completed.
+		 * Otherwise the client may try and stop the job
+		 * after we've completed.
+		 */
+		job->fn(read_err, write_err, job->context);
+		mempool_free(job, _job_pool);
+	}
+}
+
+/*
+ * Create some little jobs that will do the move between
+ * them.
+ */
+#define SPLIT_COUNT 8
+static void split_job(struct kcopyd_job *job)
+{
+	int i;
+
+	atomic_set(&job->sub_jobs, SPLIT_COUNT);
+	for (i = 0; i < SPLIT_COUNT; i++)
+		segment_complete(0, 0u, job);
+}
+
+int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from,
+		unsigned int num_dests, struct io_region *dests,
+		unsigned int flags, kcopyd_notify_fn fn, void *context)
+{
+	struct kcopyd_job *job;
+
+	/*
+	 * Allocate a new job.
+	 */
+	job = mempool_alloc(_job_pool, GFP_NOIO);
+
+	/*
+	 * set up for the read.
+	 */
+	job->kc = kc;
+	job->flags = flags;
+	job->read_err = 0;
+	job->write_err = 0;
+	job->rw = READ;
+
+	memcpy(&job->source, from, sizeof(*from));
+
+	job->num_dests = num_dests;
+	memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
+
+	job->offset = 0;
+	job->nr_pages = 0;
+	job->pages = NULL;
+
+	job->fn = fn;
+	job->context = context;
+
+	if (job->source.count < SUB_JOB_SIZE)
+		dispatch_job(job);
+
+	else {
+		init_MUTEX(&job->lock);
+		job->progress = 0;
+		split_job(job);
+	}
+
+	return 0;
+}
+
+/*
+ * Cancels a kcopyd job, eg. someone might be deactivating a
+ * mirror.
+ */
+int kcopyd_cancel(struct kcopyd_job *job, int block)
+{
+	/* FIXME: finish */
+	return -1;
+}
+
+/*-----------------------------------------------------------------
+ * Unit setup
+ *---------------------------------------------------------------*/
+static DECLARE_MUTEX(_client_lock);
+static LIST_HEAD(_clients);
+
+static int client_add(struct kcopyd_client *kc)
+{
+	down(&_client_lock);
+	list_add(&kc->list, &_clients);
+	up(&_client_lock);
+	return 0;
+}
+
+static void client_del(struct kcopyd_client *kc)
+{
+	down(&_client_lock);
+	list_del(&kc->list);
+	up(&_client_lock);
+}
+
+int kcopyd_client_create(unsigned int nr_pages, struct kcopyd_client **result)
+{
+	int r = 0;
+	struct kcopyd_client *kc;
+
+	kc = kmalloc(sizeof(*kc), GFP_KERNEL);
+	if (!kc)
+		return -ENOMEM;
+
+	kc->lock = SPIN_LOCK_UNLOCKED;
+	kc->pages = NULL;
+	kc->nr_pages = kc->nr_free_pages = 0;
+	r = client_alloc_pages(kc, nr_pages);
+	if (r) {
+		kfree(kc);
+		return r;
+	}
+
+	r = dm_io_get(nr_pages);
+	if (r) {
+		client_free_pages(kc);
+		kfree(kc);
+		return r;
+	}
+
+	r = client_add(kc);
+	if (r) {
+		dm_io_put(nr_pages);
+		client_free_pages(kc);
+		kfree(kc);
+		return r;
+	}
+
+	*result = kc;
+	return 0;
+}
+
+void kcopyd_client_destroy(struct kcopyd_client *kc)
+{
+	dm_io_put(kc->nr_pages);
+	client_free_pages(kc);
+	client_del(kc);
+	kfree(kc);
+}
+
+
+int __init kcopyd_init(void)
+{
+	int r;
+
+	r = jobs_init();
+	if (r)
+		return r;
+
+	_kcopyd_wq = create_singlethread_workqueue("kcopyd");
+	if (!_kcopyd_wq) {
+		jobs_exit();
+		return -ENOMEM;
+	}
+
+	INIT_WORK(&_kcopyd_work, do_work, NULL);
+	return 0;
+}
+
+void kcopyd_exit(void)
+{
+	jobs_exit();
+	destroy_workqueue(_kcopyd_wq);
+}
+
+EXPORT_SYMBOL(kcopyd_client_create);
+EXPORT_SYMBOL(kcopyd_client_destroy);
+EXPORT_SYMBOL(kcopyd_copy);
+EXPORT_SYMBOL(kcopyd_cancel);
diff -puN /dev/null drivers/md/kcopyd.h
--- /dev/null	Thu Apr 11 07:25:15 2002
+++ 25-akpm/drivers/md/kcopyd.h	Wed Jun  2 14:56:41 2004
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2001 Sistina Software
+ *
+ * This file is released under the GPL.
+ */
+
+#ifndef DM_KCOPYD_H
+#define DM_KCOPYD_H
+
+#include "dm-io.h"
+
+int kcopyd_init(void);
+void kcopyd_exit(void);
+
+/* FIXME: make this configurable */
+#define KCOPYD_MAX_REGIONS 8
+
+#define KCOPYD_IGNORE_ERROR 1
+
+/*
+ * To use kcopyd you must first create a kcopyd client object.
+ */
+struct kcopyd_client;
+int kcopyd_client_create(unsigned int num_pages, struct kcopyd_client **result);
+void kcopyd_client_destroy(struct kcopyd_client *kc);
+
+/*
+ * Submit a copy job to kcopyd.  This is built on top of the
+ * previous three fns.
+ *
+ * read_err is a boolean,
+ * write_err is a bitset, with 1 bit for each destination region
+ */
+typedef void (*kcopyd_notify_fn)(int read_err,
+				 unsigned int write_err, void *context);
+
+int kcopyd_copy(struct kcopyd_client *kc, struct io_region *from,
+		unsigned int num_dests, struct io_region *dests,
+		unsigned int flags, kcopyd_notify_fn fn, void *context);
+
+#endif
diff -puN drivers/md/Makefile~2-5-device-mapper-kcopyd drivers/md/Makefile
--- 25/drivers/md/Makefile~2-5-device-mapper-kcopyd	Wed Jun  2 14:56:41 2004
+++ 25-akpm/drivers/md/Makefile	Wed Jun  2 14:56:41 2004
@@ -3,7 +3,7 @@
 #
 
 dm-mod-objs	:= dm.o dm-table.o dm-target.o dm-linear.o dm-stripe.o \
-		   dm-ioctl.o dm-io.o
+		   dm-ioctl.o dm-io.o kcopyd.o
 raid6-objs	:= raid6main.o raid6algos.o raid6recov.o raid6tables.o \
 		   raid6int1.o raid6int2.o raid6int4.o \
 		   raid6int8.o raid6int16.o raid6int32.o \
_