From: Trond Myklebust <trond.myklebust@fys.uio.no>

RPCSEC_GSS: More fixes to the upcall mechanism.



---

 net/sunrpc/auth_gss/auth_gss.c |  116 +++++++++++++++++++----------------------
 1 files changed, 55 insertions(+), 61 deletions(-)

diff -puN net/sunrpc/auth_gss/auth_gss.c~nfs-02-auth_gss net/sunrpc/auth_gss/auth_gss.c
--- 25/net/sunrpc/auth_gss/auth_gss.c~nfs-02-auth_gss	2004-01-09 22:16:09.000000000 -0800
+++ 25-akpm/net/sunrpc/auth_gss/auth_gss.c	2004-01-09 22:16:09.000000000 -0800
@@ -156,17 +156,17 @@ gss_cred_set_ctx(struct rpc_cred *cred, 
 		gss_put_ctx(old);
 }
 
-static struct gss_cl_ctx *
-gss_cred_get_uptodate_ctx(struct rpc_cred *cred)
+static int
+gss_cred_is_uptodate_ctx(struct rpc_cred *cred)
 {
 	struct gss_cred *gss_cred = container_of(cred, struct gss_cred, gc_base);
-	struct gss_cl_ctx *ctx = NULL;
+	int res = 0;
 
 	read_lock(&gss_ctx_lock);
 	if ((cred->cr_flags & RPCAUTH_CRED_UPTODATE) && gss_cred->gc_ctx)
-		ctx = gss_get_ctx(gss_cred->gc_ctx);
+		res = 1;
 	read_unlock(&gss_ctx_lock);
-	return ctx;
+	return res;
 }
 
 static inline int
@@ -293,13 +293,9 @@ struct gss_upcall_msg {
 static void
 gss_release_msg(struct gss_upcall_msg *gss_msg)
 {
-	struct gss_auth *gss_auth = gss_msg->auth;
-
-	if (!atomic_dec_and_lock(&gss_msg->count, &gss_auth->lock))
+	if (!atomic_dec_and_test(&gss_msg->count))
 		return;
-	if (!list_empty(&gss_msg->list))
-		list_del(&gss_msg->list);
-	spin_unlock(&gss_auth->lock);
+	BUG_ON(!list_empty(&gss_msg->list));
 	kfree(gss_msg);
 }
 
@@ -316,24 +312,17 @@ __gss_find_upcall(struct gss_auth *gss_a
 	return NULL;
 }
 
-static struct gss_upcall_msg *
-gss_find_upcall(struct gss_auth *gss_auth, uid_t uid)
-{
-	struct gss_upcall_msg *gss_msg;
-
-	spin_lock(&gss_auth->lock);
-	gss_msg = __gss_find_upcall(gss_auth, uid);
-	spin_unlock(&gss_auth->lock);
-	return gss_msg;
-}
-
 static void
 __gss_unhash_msg(struct gss_upcall_msg *gss_msg)
 {
 	if (list_empty(&gss_msg->list))
 		return;
 	list_del_init(&gss_msg->list);
-	rpc_wake_up(&gss_msg->waitq);
+	if (gss_msg->msg.errno < 0)
+		rpc_wake_up_status(&gss_msg->waitq, gss_msg->msg.errno);
+	else
+		rpc_wake_up(&gss_msg->waitq);
+	atomic_dec(&gss_msg->count);
 }
 
 static void
@@ -346,40 +335,27 @@ gss_unhash_msg(struct gss_upcall_msg *gs
 	spin_unlock(&gss_auth->lock);
 }
 
-static void
-gss_release_callback(struct rpc_task *task)
-{
-	struct rpc_clnt *clnt = task->tk_client;
-	struct gss_auth *gss_auth = container_of(clnt->cl_auth,
-			struct gss_auth, rpc_auth);
-	struct gss_upcall_msg *gss_msg;
-
-	gss_msg = gss_find_upcall(gss_auth, task->tk_msg.rpc_cred->cr_uid);
-	BUG_ON(!gss_msg);
-	atomic_dec(&gss_msg->count);
-	gss_release_msg(gss_msg);
-}
-
 static int
-gss_upcall(struct rpc_clnt *clnt, struct rpc_task *task, uid_t uid)
+gss_upcall(struct rpc_clnt *clnt, struct rpc_task *task, struct rpc_cred *cred)
 {
 	struct gss_auth *gss_auth = container_of(clnt->cl_auth,
 			struct gss_auth, rpc_auth);
 	struct gss_upcall_msg *gss_msg, *gss_new = NULL;
 	struct rpc_pipe_msg *msg;
 	struct dentry *dentry = gss_auth->dentry;
-	int res;
+	uid_t uid = cred->cr_uid;
+	int res = 0;
 
 retry:
+	spin_lock(&gss_auth->lock);
 	gss_msg = __gss_find_upcall(gss_auth, uid);
 	if (gss_msg)
 		goto out_sleep;
 	if (gss_new == NULL) {
 		spin_unlock(&gss_auth->lock);
 		gss_new = kmalloc(sizeof(*gss_new), GFP_KERNEL);
-		if (gss_new)
+		if (!gss_new)
 			return -ENOMEM;
-		spin_lock(&gss_auth->lock);
 		goto retry;
 	}
 	gss_msg = gss_new;
@@ -394,20 +370,34 @@ retry:
 	gss_new->auth = gss_auth;
 	list_add(&gss_new->list, &gss_auth->upcalls);
 	gss_new = NULL;
-	task->tk_timeout = 5 * HZ;
-	rpc_sleep_on(&gss_msg->waitq, task, gss_release_callback, NULL);
-	spin_unlock(&gss_auth->lock);
-	res = rpc_queue_upcall(dentry->d_inode, msg);
-	if (res) {
-		gss_unhash_msg(gss_msg);
-		gss_release_msg(gss_msg);
+	/* Has someone updated the credential behind our back? */
+	if (!gss_cred_is_uptodate_ctx(cred)) {
+		/* No, so do upcall and sleep */
+		task->tk_timeout = 0;
+		rpc_sleep_on(&gss_msg->waitq, task, NULL, NULL);
+		spin_unlock(&gss_auth->lock);
+		res = rpc_queue_upcall(dentry->d_inode, msg);
+		if (res)
+			gss_unhash_msg(gss_msg);
+	} else {
+		/* Yes, so cancel upcall */
+		__gss_unhash_msg(gss_msg);
+		spin_unlock(&gss_auth->lock);
 	}
+	gss_release_msg(gss_msg);
 	return res;
 out_sleep:
-	rpc_sleep_on(&gss_msg->waitq, task, gss_release_callback, NULL);
+	/* Sleep forever */
+	task->tk_timeout = 0;
+	rpc_sleep_on(&gss_msg->waitq, task, NULL, NULL);
 	spin_unlock(&gss_auth->lock);
 	if (gss_new)
 		kfree(gss_new);
+	/* Note: we drop the reference here: we are automatically removed
+	 * from the queue when we're woken up, and we should in any case
+	 * have no further responsabilities w.r.t. the upcall.
+	 */
+	gss_release_msg(gss_msg);
 	return 0;
 }
 
@@ -496,10 +486,21 @@ void
 gss_pipe_destroy_msg(struct rpc_pipe_msg *msg)
 {
 	struct gss_upcall_msg *gss_msg = container_of(msg, struct gss_upcall_msg, msg);
+	static unsigned long ratelimit;
 
-	if (msg->errno < 0)
+	if (msg->errno < 0) {
+		atomic_inc(&gss_msg->count);
 		gss_unhash_msg(gss_msg);
-	gss_release_msg(gss_msg);
+		if (msg->errno == -ETIMEDOUT || msg->errno == -EPIPE) {
+			unsigned long now = jiffies;
+			if (time_after(now, ratelimit)) {
+				printk(KERN_WARNING "RPC: AUTH_GSS upcall timed out.\n"
+						    "Please check user daemon is running!\n");
+				ratelimit = now + 15*HZ;
+			}
+		}
+		gss_release_msg(gss_msg);
+	}
 }
 
 /* 
@@ -705,20 +706,13 @@ static int
 gss_refresh(struct rpc_task *task)
 {
 	struct rpc_clnt *clnt = task->tk_client;
-	struct gss_auth *gss_auth = container_of(clnt->cl_auth,
-			struct gss_auth, rpc_auth);
 	struct rpc_xprt *xprt = task->tk_xprt;
 	struct rpc_cred *cred = task->tk_msg.rpc_cred;
-	int err = 0;
 
 	task->tk_timeout = xprt->timeout.to_current;
-	spin_lock(&gss_auth->lock);
-	if (gss_cred_get_uptodate_ctx(cred))
-		goto out;
-	err = gss_upcall(clnt, task, cred->cr_uid);
-out:
-	spin_unlock(&gss_auth->lock);
-	return err;
+	if (!gss_cred_is_uptodate_ctx(cred))
+		return gss_upcall(clnt, task, cred);
+	return 0;
 }
 
 static u32 *

_