From: Manfred Spraul <manfred@colorfullife.com>

SIGEV_THREAD means that a given callback should be called in the context on a
new thread.  This must be done by the C library.  The kernel must deliver a
notice of the event to the C library when the callback should be called.

This patch switches to a new, simpler interface: User space creates a socket
with socket(PF_NETLINK, SOCK_RAW,0) and passes the fd to the mq_notify call
together with a cookie.  When the mq_notify() condition is satisfied, the
kernel "writes" the cookie to the socket.  User space then reads the cookie
and calls the appropriate callback.


---

 25-akpm/include/linux/mqueue.h |   16 ++
 25-akpm/ipc/mqueue.c           |  254 +++++++++++++++++------------------------
 2 files changed, 123 insertions(+), 147 deletions(-)

diff -puN include/linux/mqueue.h~mq_notify-via-netlink include/linux/mqueue.h
--- 25/include/linux/mqueue.h~mq_notify-via-netlink	Wed Apr  7 11:38:36 2004
+++ 25-akpm/include/linux/mqueue.h	Wed Apr  7 11:38:36 2004
@@ -30,8 +30,24 @@ struct mq_attr {
 	long	__reserved[4];	/* ignored for input, zeroed for output */
 };
 
+/*
+ * SIGEV_THREAD implementation:
+ * SIGEV_THREAD must be implemented in user space. If SIGEV_THREAD is passed
+ * to mq_notify, then
+ * - sigev_signo must be the file descriptor of an AF_NETLINK socket. It's not
+ *   necessary that the socket is bound.
+ * - sigev_value.sival_ptr must point to a cookie that is NOTIFY_COOKIE_LEN
+ *   bytes long.
+ * If the notification is triggered, then the cookie is sent to the netlink
+ * socket. The last byte of the cookie is replaced with the NOTIFY_?? codes:
+ * NOTIFY_WOKENUP if the notification got triggered, NOTIFY_REMOVED if it was
+ * removed, either due to a close() on the message queue fd or due to a
+ * mq_notify() that removed the notification.
+ */
 #define NOTIFY_NONE	0
 #define NOTIFY_WOKENUP	1
 #define NOTIFY_REMOVED	2
 
+#define NOTIFY_COOKIE_LEN	32
+
 #endif
diff -puN ipc/mqueue.c~mq_notify-via-netlink ipc/mqueue.c
--- 25/ipc/mqueue.c~mq_notify-via-netlink	Wed Apr  7 11:38:36 2004
+++ 25-akpm/ipc/mqueue.c	Wed Apr  7 11:38:36 2004
@@ -20,6 +20,9 @@
 #include <linux/poll.h>
 #include <linux/mqueue.h>
 #include <linux/msg.h>
+#include <linux/skbuff.h>
+#include <linux/netlink.h>
+#include <net/sock.h>
 #include "util.h"
 
 #define MQUEUE_MAGIC	0x19800202
@@ -33,9 +36,6 @@
 #define STATE_PENDING	1
 #define STATE_READY	2
 
-#define NP_NONE		((void*)NOTIFY_NONE)
-#define NP_WOKENUP	((void*)NOTIFY_WOKENUP)
-#define NP_REMOVED	((void*)NOTIFY_REMOVED)
 /* used by sysctl */
 #define FS_MQUEUE 	1
 #define CTL_QUEUESMAX 	2
@@ -48,6 +48,8 @@
 #define HARD_MSGMAX 	(131072/sizeof(void*))
 #define DFLT_MSGSIZEMAX 16384	/* max message size */
 
+#define NOTIFY_COOKIE_LEN	32
+
 struct ext_wait_queue {		/* queue of sleeping tasks */
 	struct task_struct *task;
 	struct list_head list;
@@ -56,25 +58,26 @@ struct ext_wait_queue {		/* queue of sle
 };
 
 struct mqueue_inode_info {
-	struct mq_attr attr;
+	spinlock_t lock;
+	struct inode vfs_inode;
+	wait_queue_head_t wait_q;
+
 	struct msg_msg **messages;
+	struct mq_attr attr;
 
-	pid_t notify_owner;	/* != 0 means notification registered */
-	struct sigevent notify;
-	struct file *notify_filp;
+	struct sigevent notify; /* notify.sigev_notify == SIGEV_NONE means */
+	pid_t notify_owner;	/*           no notification registered */
+	struct sock *notify_sock;
+	struct sk_buff *notify_cookie;
 
 	/* for tasks waiting for free space and messages, respectively */
 	struct ext_wait_queue e_wait_q[2];
-	wait_queue_head_t wait_q;
 
 	unsigned long qsize; /* size of queue in memory (sum of all msgs) */
-	spinlock_t lock;
-	struct inode vfs_inode;
 };
 
 static struct inode_operations mqueue_dir_inode_operations;
 static struct file_operations mqueue_file_operations;
-static struct file_operations mqueue_notify_fops;
 static struct super_operations mqueue_super_ops;
 static void remove_notification(struct mqueue_inode_info *info);
 
@@ -119,7 +122,7 @@ static struct inode *mqueue_get_inode(st
 			init_waitqueue_head(&info->wait_q);
 			INIT_LIST_HEAD(&info->e_wait_q[0].list);
 			INIT_LIST_HEAD(&info->e_wait_q[1].list);
-			info->notify_owner = 0;
+			info->notify.sigev_notify = SIGEV_NONE;
 			info->qsize = 0;
 			memset(&info->attr, 0, sizeof(info->attr));
 			info->attr.mq_maxmsg = DFLT_MSGMAX;
@@ -283,10 +286,11 @@ static ssize_t mqueue_read_file(struct f
 	snprintf(buffer, sizeof(buffer),
 			"QSIZE:%-10lu NOTIFY:%-5d SIGNO:%-5d NOTIFY_PID:%-6d\n",
 			info->qsize,
-			info->notify_owner ? info->notify.sigev_notify : SIGEV_NONE,
-			(info->notify_owner && info->notify.sigev_notify == SIGEV_SIGNAL ) ?
+			info->notify.sigev_notify,
+			(info->notify.sigev_notify == SIGEV_SIGNAL ) ?
 				info->notify.sigev_signo : 0,
-			info->notify_owner);
+			(info->notify.sigev_notify != SIGEV_NONE) ?
+				info->notify_owner : 0);
 	spin_unlock(&info->lock);
 	buffer[sizeof(buffer)-1] = '\0';
 	slen = strlen(buffer)+1;
@@ -299,7 +303,7 @@ static ssize_t mqueue_read_file(struct f
 		count = slen - o;
 
 	if (copy_to_user(u_data, buffer + o, count))
-       		return -EFAULT;
+		return -EFAULT;
 
 	*off = o + count;
 	filp->f_dentry->d_inode->i_atime = filp->f_dentry->d_inode->i_ctime = CURRENT_TIME;
@@ -311,7 +315,8 @@ static int mqueue_flush_file(struct file
 	struct mqueue_inode_info *info = MQUEUE_I(filp->f_dentry->d_inode);
 
 	spin_lock(&info->lock);
-	if (current->tgid == info->notify_owner)
+	if (info->notify.sigev_notify != SIGEV_NONE &&
+			current->tgid == info->notify_owner)
 		remove_notification(info);
 
 	spin_unlock(&info->lock);
@@ -435,6 +440,11 @@ static inline struct msg_msg *msg_get(st
 	return info->messages[info->attr.mq_curmsgs];
 }
 
+static inline void set_cookie(struct sk_buff *skb, char code)
+{
+	((char*)skb->data)[NOTIFY_COOKIE_LEN-1] = code;
+}
+
 /*
  * The next function is only to split too long sys_mq_timedsend
  */
@@ -445,7 +455,8 @@ static void __do_notify(struct mqueue_in
 	 * waiting synchronously for message AND state of queue changed from
 	 * empty to not empty. Here we are sure that no one is waiting
 	 * synchronously. */
-	if (info->notify_owner && info->attr.mq_curmsgs == 1) {
+	if (info->notify.sigev_notify != SIGEV_NONE &&
+			info->attr.mq_curmsgs == 1) {
 		/* sends signal */
 		if (info->notify.sigev_notify == SIGEV_SIGNAL) {
 			struct siginfo sig_i;
@@ -460,10 +471,12 @@ static void __do_notify(struct mqueue_in
 			kill_proc_info(info->notify.sigev_signo,
 				       &sig_i, info->notify_owner);
 		} else if (info->notify.sigev_notify == SIGEV_THREAD) {
-			info->notify_filp->private_data = (void*)NP_WOKENUP;
+			set_cookie(info->notify_cookie, NOTIFY_WOKENUP);
+			netlink_sendskb(info->notify_sock,
+					info->notify_cookie, 0);
 		}
 		/* after notification unregisters process */
-		info->notify_owner = 0;
+		info->notify.sigev_notify = SIGEV_NONE;
 	}
 	wake_up(&info->wait_q);
 }
@@ -499,90 +512,13 @@ static long prepare_timeout(const struct
 	return timeout;
 }
 
-/*
- * File descriptor based notification, intended to be used to implement
- * SIGEV_THREAD:
- * SIGEV_THREAD means that a notification function should be called in the
- * context of a new thread. The kernel can't do that. Therefore mq_notify
- * calls with SIGEV_THREAD return a new file descriptor. A user space helper
- * must create a new thread and then read from the given file descriptor.
- * The read always returns one byte. If it's NOTIFY_WOKENUP, then it must
- * call the notification function. If it's NOTIFY_REMOVED, then the
- * notification was removed. The file descriptor supports poll, thus one
- * supervisor thread can manage multiple message queue notifications.
- *
- * The implementation must support multiple outstanding notifications:
- * It's possible that a new notification is added and signaled before user
- * space calls mqueue_notify_read for the previous notification.
- * Therefore the notification state is stored in the private_data field of
- * the file descriptor.
- */
-static unsigned int mqueue_notify_poll(struct file *filp,
-					struct poll_table_struct *poll_tab)
-{
-	struct mqueue_inode_info *info = MQUEUE_I(filp->f_dentry->d_inode);
-	int retval;
-
-	poll_wait(filp, &info->wait_q, poll_tab);
-
-	if (filp->private_data == NP_NONE)
-		retval = 0;
-	else
-		retval = POLLIN | POLLRDNORM;
-	return retval;
-}
-
-static ssize_t mqueue_notify_read(struct file *filp, char __user *buf,
-					size_t count, loff_t *ppos)
-{
-	struct mqueue_inode_info *info = MQUEUE_I(filp->f_dentry->d_inode);
-	char result;
-
-	if (!count)
-		return 0;
-	if (*ppos != 0)
-		return 0;
-	spin_lock(&info->lock);
-	while (filp->private_data == NP_NONE) {
-		DEFINE_WAIT(wait);
-		if (filp->f_flags & O_NONBLOCK) {
-			spin_unlock(&info->lock);
-			return -EAGAIN;
-		}
-		prepare_to_wait(&info->wait_q, &wait, TASK_INTERRUPTIBLE);
-		spin_unlock(&info->lock);
-		schedule();
-		finish_wait(&info->wait_q, &wait);
-		spin_lock(&info->lock);
-	}
-	spin_unlock(&info->lock);
-	result = (char)(unsigned long)filp->private_data;
-	if (put_user(result, buf))
-		return -EFAULT;
-	*ppos = 1;
-	return 1;
-}
-
-static int mqueue_notify_release(struct inode *inode, struct file *filp)
-{
-	struct mqueue_inode_info *info = MQUEUE_I(filp->f_dentry->d_inode);
-
-	spin_lock(&info->lock);
-	if (info->notify_owner && info->notify_filp == filp)
-		info->notify_owner = 0;
-	filp->private_data = NP_REMOVED;
-	spin_unlock(&info->lock);
-
-	return 0;
-}
-
 static void remove_notification(struct mqueue_inode_info *info)
 {
 	if (info->notify.sigev_notify == SIGEV_THREAD) {
-		info->notify_filp->private_data = NP_REMOVED;
-		wake_up(&info->wait_q);
+		set_cookie(info->notify_cookie, NOTIFY_REMOVED);
+		netlink_sendskb(info->notify_sock, info->notify_cookie, 0);
 	}
-	info->notify_owner = 0;
+	info->notify.sigev_notify = SIGEV_NONE;
 }
 
 /*
@@ -780,7 +716,8 @@ out_unlock:
  */
 
 /* pipelined_send() - send a message directly to the task waiting in
- * sys_mq_timedreceive() (without inserting message into a queue). */
+ * sys_mq_timedreceive() (without inserting message into a queue).
+ */
 static inline void pipelined_send(struct mqueue_inode_info *info,
 				  struct msg_msg *message,
 				  struct ext_wait_queue *receiver)
@@ -974,12 +911,16 @@ out:
 asmlinkage long sys_mq_notify(mqd_t mqdes,
 				const struct sigevent __user *u_notification)
 {
-	int ret, fd;
-	struct file *filp, *nfilp;
+	int ret;
+	struct file *filp;
+	struct sock *sock;
 	struct inode *inode;
 	struct sigevent notification;
 	struct mqueue_inode_info *info;
+	struct sk_buff *nc;
 
+	nc = NULL;
+	sock = NULL;
 	if (u_notification == NULL) {
 		notification.sigev_notify = SIGEV_NONE;
 	} else {
@@ -996,6 +937,44 @@ asmlinkage long sys_mq_notify(mqd_t mqde
 			 notification.sigev_signo > _NSIG)) {
 			return -EINVAL;
 		}
+		if (notification.sigev_notify == SIGEV_THREAD) {
+			/* create the notify skb */
+			nc = alloc_skb(NOTIFY_COOKIE_LEN, GFP_KERNEL);
+			ret = -ENOMEM;
+			if (!nc)
+				goto out;
+			ret = -EFAULT;
+			if (copy_from_user(nc->data,
+					notification.sigev_value.sival_ptr,
+					NOTIFY_COOKIE_LEN)) {
+				goto out;
+			}
+
+			/* TODO: add a header? */
+			skb_put(nc, NOTIFY_COOKIE_LEN);
+			/* and attach it to the socket */
+retry:
+			filp = fget(notification.sigev_signo);
+			ret = -EBADF;
+			if (!filp)
+				goto out;
+			sock = netlink_getsockbyfilp(filp);
+			fput(filp);
+			if (IS_ERR(sock)) {
+				ret = PTR_ERR(sock);
+				sock = NULL;
+				goto out;
+			}
+
+			ret = netlink_attachskb(sock, nc, 0, MAX_SCHEDULE_TIMEOUT);
+			if (ret == 1)
+		       		goto retry;
+			if (ret) {
+				sock = NULL;
+				nc = NULL;
+				goto out;
+			}
+		}
 	}
 
 	ret = -EBADF;
@@ -1009,47 +988,33 @@ asmlinkage long sys_mq_notify(mqd_t mqde
 	info = MQUEUE_I(inode);
 
 	ret = 0;
-	if (notification.sigev_notify == SIGEV_THREAD) {
-		ret = get_unused_fd();
-		if (ret < 0)
-			goto out_fput;
-		fd = ret;
-		nfilp = get_empty_filp();
-		if (!nfilp) {
-			ret = -ENFILE;
-			goto out_dropfd;
-		}
-		nfilp->private_data = NP_NONE;
-		nfilp->f_op = &mqueue_notify_fops;
-		nfilp->f_vfsmnt = mntget(mqueue_mnt);
-		nfilp->f_dentry = dget(filp->f_dentry);
-		nfilp->f_mapping = filp->f_dentry->d_inode->i_mapping;
-		nfilp->f_flags = O_RDONLY;
-		nfilp->f_mode = FMODE_READ;
-	} else {
-		nfilp = NULL;
-		fd = -1;
-	}
-
 	spin_lock(&info->lock);
-
-	if (notification.sigev_notify == SIGEV_NONE) {
-		if (info->notify_owner == current->tgid) {
+	switch (notification.sigev_notify) {
+	case SIGEV_NONE:
+		if (info->notify.sigev_notify != SIGEV_NONE &&
+				info->notify_owner == current->tgid) {
 			remove_notification(info);
 			inode->i_atime = inode->i_ctime = CURRENT_TIME;
 		}
-	} else if (info->notify_owner) {
-		ret = -EBUSY;
-	} else if (notification.sigev_notify == SIGEV_THREAD) {
-		info->notify_filp = nfilp;
-		fd_install(fd, nfilp);
-		ret = fd;
-		fd = -1;
-		nfilp = NULL;
+		break;
+	case SIGEV_THREAD:
+		if (info->notify.sigev_notify != SIGEV_NONE) {
+			ret = -EBUSY;
+			break;
+		}
+		info->notify_sock = sock;
+		info->notify_cookie = nc;
+		sock = NULL;
+		nc = NULL;
 		info->notify.sigev_notify = SIGEV_THREAD;
 		info->notify_owner = current->tgid;
 		inode->i_atime = inode->i_ctime = CURRENT_TIME;
-	}  else {
+		break;
+	case SIGEV_SIGNAL:
+		if (info->notify.sigev_notify != SIGEV_NONE) {
+			ret = -EBUSY;
+			break;
+		}
 		info->notify.sigev_signo = notification.sigev_signo;
 		info->notify.sigev_value = notification.sigev_value;
 		info->notify.sigev_notify = SIGEV_SIGNAL;
@@ -1057,12 +1022,14 @@ asmlinkage long sys_mq_notify(mqd_t mqde
 		inode->i_atime = inode->i_ctime = CURRENT_TIME;
 	}
 	spin_unlock(&info->lock);
-out_dropfd:
-	if (fd != -1)
-		put_unused_fd(fd);
 out_fput:
 	fput(filp);
 out:
+	if (sock) {
+		netlink_detachskb(sock, nc);
+	} else if (nc) {
+		dev_kfree_skb(nc);
+	}
 	return ret;
 }
 
@@ -1131,13 +1098,6 @@ static struct file_operations mqueue_fil
 	.read = mqueue_read_file,
 };
 
-static struct file_operations mqueue_notify_fops = {
-	.poll = mqueue_notify_poll,
-	.read = mqueue_notify_read,
-	.release = mqueue_notify_release,
-};
-
-
 static struct super_operations mqueue_super_ops = {
 	.alloc_inode = mqueue_alloc_inode,
 	.destroy_inode = mqueue_destroy_inode,

_