From: OGAWA Hirofumi <hirofumi@mail.parknet.co.jp>

Current writev() of pipe/fifo can be interleaved with data from other
processes doing writes even when the requests size is <= PIPE_BUF.  These
writes should in fact be atomic.

The readv() side is also supported for same behavior with read().  And it
is faster.

readv/writev version of bw_pipe in LMbench

2.6.0-test9-bk12
hirofumi@devron (i686-pc-linux-gnu)[1010]$ ./bw_pipe -m 4096 -M 5
Pipe bandwidth: 45.53 MB/sec
hirofumi@devron (i686-pc-linux-gnu)[1009]$ ./bw_pipe -m 1024 -M 5
Pipe bandwidth: 20.08 MB/sec

2.6.0-test9-bk12 + patch
hirofumi@devron (i686-pc-linux-gnu)[1001]$ ./bw_pipe -m 4096 -M 5
Pipe bandwidth: 65.98 MB/sec
hirofumi@devron (i686-pc-linux-gnu)[1002]$ ./bw_pipe -m 1024 -M 5
Pipe bandwidth: 32.19 MB/sec




 fs/pipe.c |  114 ++++++++++++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 92 insertions(+), 22 deletions(-)

diff -puN fs/pipe.c~pipe-readv-writev fs/pipe.c
--- 25/fs/pipe.c~pipe-readv-writev	2003-11-09 11:09:41.000000000 -0800
+++ 25-akpm/fs/pipe.c	2003-11-09 11:09:41.000000000 -0800
@@ -13,6 +13,7 @@
 #include <linux/fs.h>
 #include <linux/mount.h>
 #include <linux/pipe_fs_i.h>
+#include <linux/uio.h>
 #include <asm/uaccess.h>
 #include <asm/ioctls.h>
 
@@ -43,19 +44,63 @@ void pipe_wait(struct inode * inode)
 	down(PIPE_SEM(*inode));
 }
 
+static inline int
+pipe_iov_copy_from_user(void *to, struct iovec *iov, unsigned long len)
+{
+	unsigned long copy;
+
+	while (len > 0) {
+		while (!iov->iov_len)
+			iov++;
+		copy = min_t(unsigned long, len, iov->iov_len);
+
+		if (copy_from_user(to, iov->iov_base, copy))
+			return -EFAULT;
+		to += copy;
+		len -= copy;
+		iov->iov_base += copy;
+		iov->iov_len -= copy;
+	}
+	return 0;
+}
+
+static inline int
+pipe_iov_copy_to_user(struct iovec *iov, const void *from, unsigned long len)
+{
+	unsigned long copy;
+
+	while (len > 0) {
+		while (!iov->iov_len)
+			iov++;
+		copy = min_t(unsigned long, len, iov->iov_len);
+
+		if (copy_to_user(iov->iov_base, from, copy))
+			return -EFAULT;
+		from += copy;
+		len -= copy;
+		iov->iov_base += copy;
+		iov->iov_len -= copy;
+	}
+	return 0;
+}
+
 static ssize_t
-pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
+pipe_readv(struct file *filp, const struct iovec *_iov,
+	   unsigned long nr_segs, loff_t *ppos)
 {
 	struct inode *inode = filp->f_dentry->d_inode;
 	int do_wakeup;
 	ssize_t ret;
+	struct iovec *iov = (struct iovec *)_iov;
+	size_t total_len;
 
 	/* pread is not allowed on pipes. */
 	if (unlikely(ppos != &filp->f_pos))
 		return -ESPIPE;
-	
+
+	total_len = iov_length(iov, nr_segs);
 	/* Null read succeeds. */
-	if (unlikely(count == 0))
+	if (unlikely(total_len == 0))
 		return 0;
 
 	do_wakeup = 0;
@@ -67,12 +112,12 @@ pipe_read(struct file *filp, char __user
 			char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode);
 			ssize_t chars = PIPE_MAX_RCHUNK(*inode);
 
-			if (chars > count)
-				chars = count;
+			if (chars > total_len)
+				chars = total_len;
 			if (chars > size)
 				chars = size;
 
-			if (copy_to_user(buf, pipebuf, chars)) {
+			if (pipe_iov_copy_to_user(iov, pipebuf, chars)) {
 				if (!ret) ret = -EFAULT;
 				break;
 			}
@@ -81,12 +126,11 @@ pipe_read(struct file *filp, char __user
 			PIPE_START(*inode) += chars;
 			PIPE_START(*inode) &= (PIPE_SIZE - 1);
 			PIPE_LEN(*inode) -= chars;
-			count -= chars;
-			buf += chars;
+			total_len -= chars;
 			do_wakeup = 1;
+			if (!total_len)
+				break;	/* common path: read succeeded */
 		}
-		if (!count)
-			break;	/* common path: read succeeded */
 		if (PIPE_LEN(*inode)) /* test for cyclic buffers */
 			continue;
 		if (!PIPE_WRITERS(*inode))
@@ -126,24 +170,35 @@ pipe_read(struct file *filp, char __user
 }
 
 static ssize_t
-pipe_write(struct file *filp, const char __user *buf, size_t count, loff_t *ppos)
+pipe_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
+{
+	struct iovec iov = { .iov_base = buf, .iov_len = count };
+	return pipe_readv(filp, &iov, 1, ppos);
+}
+
+static ssize_t
+pipe_writev(struct file *filp, const struct iovec *_iov,
+	    unsigned long nr_segs, loff_t *ppos)
 {
 	struct inode *inode = filp->f_dentry->d_inode;
 	ssize_t ret;
 	size_t min;
 	int do_wakeup;
+	struct iovec *iov = (struct iovec *)_iov;
+	size_t total_len;
 
 	/* pwrite is not allowed on pipes. */
 	if (unlikely(ppos != &filp->f_pos))
 		return -ESPIPE;
-	
+
+	total_len = iov_length(iov, nr_segs);
 	/* Null write succeeds. */
-	if (unlikely(count == 0))
+	if (unlikely(total_len == 0))
 		return 0;
 
 	do_wakeup = 0;
 	ret = 0;
-	min = count;
+	min = total_len;
 	if (min > PIPE_BUF)
 		min = 1;
 	down(PIPE_SEM(*inode));
@@ -164,23 +219,22 @@ pipe_write(struct file *filp, const char
 			 * syscall merging.
 			 */
 			do_wakeup = 1;
-			if (chars > count)
-				chars = count;
+			if (chars > total_len)
+				chars = total_len;
 			if (chars > free)
 				chars = free;
 
-			if (copy_from_user(pipebuf, buf, chars)) {
+			if (pipe_iov_copy_from_user(pipebuf, iov, chars)) {
 				if (!ret) ret = -EFAULT;
 				break;
 			}
-
 			ret += chars;
+
 			PIPE_LEN(*inode) += chars;
-			count -= chars;
-			buf += chars;
+			total_len -= chars;
+			if (!total_len)
+				break;
 		}
-		if (!count)
-			break;
 		if (PIPE_FREE(*inode) && ret) {
 			/* handle cyclic data buffers */
 			min = 1;
@@ -214,6 +268,14 @@ pipe_write(struct file *filp, const char
 }
 
 static ssize_t
+pipe_write(struct file *filp, const char __user *buf,
+	   size_t count, loff_t *ppos)
+{
+	struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count };
+	return pipe_writev(filp, &iov, 1, ppos);
+}
+
+static ssize_t
 bad_pipe_r(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
 {
 	return -EBADF;
@@ -405,6 +467,7 @@ pipe_rdwr_open(struct inode *inode, stru
 struct file_operations read_fifo_fops = {
 	.llseek		= no_llseek,
 	.read		= pipe_read,
+	.readv		= pipe_readv,
 	.write		= bad_pipe_w,
 	.poll		= fifo_poll,
 	.ioctl		= pipe_ioctl,
@@ -417,6 +480,7 @@ struct file_operations write_fifo_fops =
 	.llseek		= no_llseek,
 	.read		= bad_pipe_r,
 	.write		= pipe_write,
+	.writev		= pipe_writev,
 	.poll		= fifo_poll,
 	.ioctl		= pipe_ioctl,
 	.open		= pipe_write_open,
@@ -427,7 +491,9 @@ struct file_operations write_fifo_fops =
 struct file_operations rdwr_fifo_fops = {
 	.llseek		= no_llseek,
 	.read		= pipe_read,
+	.readv		= pipe_readv,
 	.write		= pipe_write,
+	.writev		= pipe_writev,
 	.poll		= fifo_poll,
 	.ioctl		= pipe_ioctl,
 	.open		= pipe_rdwr_open,
@@ -438,6 +504,7 @@ struct file_operations rdwr_fifo_fops = 
 struct file_operations read_pipe_fops = {
 	.llseek		= no_llseek,
 	.read		= pipe_read,
+	.readv		= pipe_readv,
 	.write		= bad_pipe_w,
 	.poll		= pipe_poll,
 	.ioctl		= pipe_ioctl,
@@ -450,6 +517,7 @@ struct file_operations write_pipe_fops =
 	.llseek		= no_llseek,
 	.read		= bad_pipe_r,
 	.write		= pipe_write,
+	.writev		= pipe_writev,
 	.poll		= pipe_poll,
 	.ioctl		= pipe_ioctl,
 	.open		= pipe_write_open,
@@ -460,7 +528,9 @@ struct file_operations write_pipe_fops =
 struct file_operations rdwr_pipe_fops = {
 	.llseek		= no_llseek,
 	.read		= pipe_read,
+	.readv		= pipe_readv,
 	.write		= pipe_write,
+	.writev		= pipe_writev,
 	.poll		= pipe_poll,
 	.ioctl		= pipe_ioctl,
 	.open		= pipe_rdwr_open,

_