From: Trond Myklebust <trond.myklebust@fys.uio.no>

RPC: Add support for sharing the same RPC transport and credential caches
between different mountpoints by allowing cloning of the rpc_client struct.


---

 include/linux/sunrpc/auth.h |    1 
 include/linux/sunrpc/clnt.h |   23 ++++++++++------
 net/sunrpc/auth.c           |   13 +++++++--
 net/sunrpc/clnt.c           |   61 +++++++++++++++++++++++++++++++++++++++++---
 net/sunrpc/pmap_clnt.c      |   17 ++++++------
 net/sunrpc/sunrpc_syms.c    |    2 +
 net/sunrpc/xprt.c           |    8 ++---
 7 files changed, 98 insertions(+), 27 deletions(-)

diff -puN include/linux/sunrpc/auth.h~nfs-16-rpc_clones include/linux/sunrpc/auth.h
--- 25/include/linux/sunrpc/auth.h~nfs-16-rpc_clones	2004-01-09 22:16:17.000000000 -0800
+++ 25-akpm/include/linux/sunrpc/auth.h	2004-01-09 22:16:17.000000000 -0800
@@ -73,6 +73,7 @@ struct rpc_auth {
 						 * differ from the flavor in
 						 * au_ops->au_flavor in gss
 						 * case) */
+	atomic_t		au_count;	/* Reference counter */
 
 	/* per-flavor data */
 };
diff -puN include/linux/sunrpc/clnt.h~nfs-16-rpc_clones include/linux/sunrpc/clnt.h
--- 25/include/linux/sunrpc/clnt.h~nfs-16-rpc_clones	2004-01-09 22:16:17.000000000 -0800
+++ 25-akpm/include/linux/sunrpc/clnt.h	2004-01-09 22:16:17.000000000 -0800
@@ -26,6 +26,8 @@ struct rpc_portmap {
 	__u32			pm_vers;
 	__u32			pm_prot;
 	__u16			pm_port;
+	unsigned char		pm_binding : 1;	/* doing a getport() */
+	struct rpc_wait_queue	pm_bindwait;	/* waiting on getport() */
 };
 
 struct rpc_inode;
@@ -34,6 +36,7 @@ struct rpc_inode;
  * The high-level client handle
  */
 struct rpc_clnt {
+	atomic_t		cl_count;	/* Number of clones */
 	atomic_t		cl_users;	/* number of references */
 	struct rpc_xprt *	cl_xprt;	/* transport */
 	struct rpc_procinfo *	cl_procinfo;	/* procedure info */
@@ -48,26 +51,27 @@ struct rpc_clnt {
 				cl_intr     : 1,/* interruptible */
 				cl_chatty   : 1,/* be verbose */
 				cl_autobind : 1,/* use getport() */
-				cl_binding  : 1,/* doing a getport() */
 				cl_droppriv : 1,/* enable NFS suid hack */
 				cl_oneshot  : 1,/* dispose after use */
 				cl_dead     : 1;/* abandoned */
 
-	struct rpc_rtt		cl_rtt;		/* RTO estimator data */
-
-	struct rpc_portmap	cl_pmap;	/* port mapping */
-	struct rpc_wait_queue	cl_bindwait;	/* waiting on getport() */
+	struct rpc_rtt *	cl_rtt;		/* RTO estimator data */
+	struct rpc_portmap *	cl_pmap;	/* port mapping */
 
 	int			cl_nodelen;	/* nodename length */
 	char 			cl_nodename[UNX_MAXNODENAME];
 	char			cl_pathname[30];/* Path in rpc_pipe_fs */
 	struct dentry *		cl_dentry;	/* inode */
+	struct rpc_clnt *	cl_parent;	/* Points to parent of clones */
+	struct rpc_rtt		cl_rtt_default;
+	struct rpc_portmap	cl_pmap_default;
+	char			cl_inline_name[32];
 };
 #define cl_timeout		cl_xprt->timeout
-#define cl_prog			cl_pmap.pm_prog
-#define cl_vers			cl_pmap.pm_vers
-#define cl_port			cl_pmap.pm_port
-#define cl_prot			cl_pmap.pm_prot
+#define cl_prog			cl_pmap->pm_prog
+#define cl_vers			cl_pmap->pm_vers
+#define cl_port			cl_pmap->pm_port
+#define cl_prot			cl_pmap->pm_prot
 
 /*
  * General RPC program info
@@ -108,6 +112,7 @@ struct rpc_procinfo {
 struct rpc_clnt *rpc_create_client(struct rpc_xprt *xprt, char *servname,
 				struct rpc_program *info,
 				u32 version, rpc_authflavor_t authflavor);
+struct rpc_clnt *rpc_clone_client(struct rpc_clnt *);
 int		rpc_shutdown_client(struct rpc_clnt *);
 int		rpc_destroy_client(struct rpc_clnt *);
 void		rpc_release_client(struct rpc_clnt *);
diff -puN net/sunrpc/auth.c~nfs-16-rpc_clones net/sunrpc/auth.c
--- 25/net/sunrpc/auth.c~nfs-16-rpc_clones	2004-01-09 22:16:17.000000000 -0800
+++ 25-akpm/net/sunrpc/auth.c	2004-01-09 22:16:17.000000000 -0800
@@ -61,6 +61,7 @@ rpcauth_unregister(struct rpc_authops *o
 struct rpc_auth *
 rpcauth_create(rpc_authflavor_t pseudoflavor, struct rpc_clnt *clnt)
 {
+	struct rpc_auth		*auth;
 	struct rpc_authops	*ops;
 	u32			flavor = pseudoflavor_to_flavor(pseudoflavor);
 
@@ -68,13 +69,21 @@ rpcauth_create(rpc_authflavor_t pseudofl
 		return NULL;
 	if (!try_module_get(ops->owner))
 		return NULL;
-	clnt->cl_auth = ops->create(clnt, pseudoflavor);
-	return clnt->cl_auth;
+	auth = ops->create(clnt, pseudoflavor);
+	if (!auth)
+		return NULL;
+	atomic_set(&auth->au_count, 1);
+	if (clnt->cl_auth)
+		rpcauth_destroy(clnt->cl_auth);
+	clnt->cl_auth = auth;
+	return auth;
 }
 
 void
 rpcauth_destroy(struct rpc_auth *auth)
 {
+	if (!atomic_dec_and_test(&auth->au_count))
+		return;
 	auth->au_ops->destroy(auth);
 	module_put(auth->au_ops->owner);
 	kfree(auth);
diff -puN net/sunrpc/clnt.c~nfs-16-rpc_clones net/sunrpc/clnt.c
--- 25/net/sunrpc/clnt.c~nfs-16-rpc_clones	2004-01-09 22:16:17.000000000 -0800
+++ 25-akpm/net/sunrpc/clnt.c	2004-01-09 22:16:17.000000000 -0800
@@ -102,6 +102,7 @@ rpc_create_client(struct rpc_xprt *xprt,
 {
 	struct rpc_version	*version;
 	struct rpc_clnt		*clnt = NULL;
+	int len;
 
 	dprintk("RPC: creating %s client for %s (xprt %p)\n",
 		program->name, servname, xprt);
@@ -116,23 +117,37 @@ rpc_create_client(struct rpc_xprt *xprt,
 		goto out_no_clnt;
 	memset(clnt, 0, sizeof(*clnt));
 	atomic_set(&clnt->cl_users, 0);
+	atomic_set(&clnt->cl_count, 1);
+	clnt->cl_parent = clnt;
+
+	clnt->cl_server = clnt->cl_inline_name;
+	len = strlen(servname) + 1;
+	if (len > sizeof(clnt->cl_inline_name)) {
+		char *buf = kmalloc(len, GFP_KERNEL);
+		if (buf != 0)
+			clnt->cl_server = buf;
+		else
+			len = sizeof(clnt->cl_inline_name);
+	}
+	strlcpy(clnt->cl_server, servname, len);
 
 	clnt->cl_xprt     = xprt;
 	clnt->cl_procinfo = version->procs;
 	clnt->cl_maxproc  = version->nrprocs;
-	clnt->cl_server   = servname;
 	clnt->cl_protname = program->name;
+	clnt->cl_pmap	  = &clnt->cl_pmap_default;
 	clnt->cl_port     = xprt->addr.sin_port;
 	clnt->cl_prog     = program->number;
 	clnt->cl_vers     = version->number;
 	clnt->cl_prot     = xprt->prot;
 	clnt->cl_stats    = program->stats;
-	INIT_RPC_WAITQ(&clnt->cl_bindwait, "bindwait");
+	INIT_RPC_WAITQ(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
 
 	if (!clnt->cl_port)
 		clnt->cl_autobind = 1;
 
-	rpc_init_rtt(&clnt->cl_rtt, xprt->timeout.to_initval);
+	clnt->cl_rtt = &clnt->cl_rtt_default;
+	rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
 
 	if (rpc_setup_pipedir(clnt, program->pipe_dir_name) < 0)
 		goto out_no_path;
@@ -157,12 +172,40 @@ out_no_clnt:
 out_no_auth:
 	rpc_rmdir(clnt->cl_pathname);
 out_no_path:
+	if (clnt->cl_server != clnt->cl_inline_name)
+		kfree(clnt->cl_server);
 	kfree(clnt);
 	clnt = NULL;
 	goto out;
 }
 
 /*
+ * This function clones the RPC client structure. It allows us to share the
+ * same transport while varying parameters such as the authentication
+ * flavour.
+ */
+struct rpc_clnt *
+rpc_clone_client(struct rpc_clnt *clnt)
+{
+	struct rpc_clnt *new;
+
+	new = (struct rpc_clnt *)kmalloc(sizeof(*new), GFP_KERNEL);
+	if (!new)
+		goto out_no_clnt;
+	memcpy(new, clnt, sizeof(*new));
+	atomic_set(&new->cl_count, 1);
+	atomic_set(&new->cl_users, 0);
+	atomic_inc(&new->cl_parent->cl_count);
+	if (new->cl_auth)
+		atomic_inc(&new->cl_auth->au_count);
+out:
+	return new;
+out_no_clnt:
+	printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
+	goto out;
+}
+
+/*
  * Properly shut down an RPC client, terminating all outstanding
  * requests. Note that we must be certain that cl_oneshot and
  * cl_dead are cleared, or else the client would be destroyed
@@ -201,19 +244,29 @@ rpc_shutdown_client(struct rpc_clnt *cln
 int
 rpc_destroy_client(struct rpc_clnt *clnt)
 {
+	if (!atomic_dec_and_test(&clnt->cl_count))
+		return 1;
+	BUG_ON(atomic_read(&clnt->cl_users) != 0);
+
 	dprintk("RPC: destroying %s client for %s\n",
 			clnt->cl_protname, clnt->cl_server);
-
 	if (clnt->cl_auth) {
 		rpcauth_destroy(clnt->cl_auth);
 		clnt->cl_auth = NULL;
 	}
+	if (clnt->cl_parent != clnt) {
+		rpc_destroy_client(clnt->cl_parent);
+		goto out_free;
+	}
 	if (clnt->cl_pathname[0])
 		rpc_rmdir(clnt->cl_pathname);
 	if (clnt->cl_xprt) {
 		xprt_destroy(clnt->cl_xprt);
 		clnt->cl_xprt = NULL;
 	}
+	if (clnt->cl_server != clnt->cl_inline_name)
+		kfree(clnt->cl_server);
+out_free:
 	kfree(clnt);
 	return 0;
 }
diff -puN net/sunrpc/pmap_clnt.c~nfs-16-rpc_clones net/sunrpc/pmap_clnt.c
--- 25/net/sunrpc/pmap_clnt.c~nfs-16-rpc_clones	2004-01-09 22:16:17.000000000 -0800
+++ 25-akpm/net/sunrpc/pmap_clnt.c	2004-01-09 22:16:17.000000000 -0800
@@ -41,7 +41,7 @@ static spinlock_t		pmap_lock = SPIN_LOCK
 void
 rpc_getport(struct rpc_task *task, struct rpc_clnt *clnt)
 {
-	struct rpc_portmap *map = &clnt->cl_pmap;
+	struct rpc_portmap *map = clnt->cl_pmap;
 	struct sockaddr_in *sap = &clnt->cl_xprt->addr;
 	struct rpc_message msg = {
 		.rpc_proc	= &pmap_procedures[PMAP_GETPORT],
@@ -57,12 +57,12 @@ rpc_getport(struct rpc_task *task, struc
 			map->pm_prog, map->pm_vers, map->pm_prot);
 
 	spin_lock(&pmap_lock);
-	if (clnt->cl_binding) {
-		rpc_sleep_on(&clnt->cl_bindwait, task, NULL, 0);
+	if (map->pm_binding) {
+		rpc_sleep_on(&map->pm_bindwait, task, NULL, 0);
 		spin_unlock(&pmap_lock);
 		return;
 	}
-	clnt->cl_binding = 1;
+	map->pm_binding = 1;
 	spin_unlock(&pmap_lock);
 
 	task->tk_status = -EACCES; /* why set this? returns -EIO below */
@@ -85,8 +85,8 @@ rpc_getport(struct rpc_task *task, struc
 
 bailout:
 	spin_lock(&pmap_lock);
-	clnt->cl_binding = 0;
-	rpc_wake_up(&clnt->cl_bindwait);
+	map->pm_binding = 0;
+	rpc_wake_up(&map->pm_bindwait);
 	spin_unlock(&pmap_lock);
 	task->tk_status = -EIO;
 	task->tk_action = NULL;
@@ -129,6 +129,7 @@ static void
 pmap_getport_done(struct rpc_task *task)
 {
 	struct rpc_clnt	*clnt = task->tk_client;
+	struct rpc_portmap *map = clnt->cl_pmap;
 
 	dprintk("RPC: %4d pmap_getport_done(status %d, port %d)\n",
 			task->tk_pid, task->tk_status, clnt->cl_port);
@@ -145,8 +146,8 @@ pmap_getport_done(struct rpc_task *task)
 		clnt->cl_xprt->addr.sin_port = clnt->cl_port;
 	}
 	spin_lock(&pmap_lock);
-	clnt->cl_binding = 0;
-	rpc_wake_up(&clnt->cl_bindwait);
+	map->pm_binding = 0;
+	rpc_wake_up(&map->pm_bindwait);
 	spin_unlock(&pmap_lock);
 }
 
diff -puN net/sunrpc/sunrpc_syms.c~nfs-16-rpc_clones net/sunrpc/sunrpc_syms.c
--- 25/net/sunrpc/sunrpc_syms.c~nfs-16-rpc_clones	2004-01-09 22:16:17.000000000 -0800
+++ 25-akpm/net/sunrpc/sunrpc_syms.c	2004-01-09 22:16:17.000000000 -0800
@@ -41,6 +41,7 @@ EXPORT_SYMBOL(rpc_release_task);
 
 /* RPC client functions */
 EXPORT_SYMBOL(rpc_create_client);
+EXPORT_SYMBOL(rpc_clone_client);
 EXPORT_SYMBOL(rpc_destroy_client);
 EXPORT_SYMBOL(rpc_shutdown_client);
 EXPORT_SYMBOL(rpc_release_client);
@@ -66,6 +67,7 @@ EXPORT_SYMBOL(xprt_set_timeout);
 /* Client credential cache */
 EXPORT_SYMBOL(rpcauth_register);
 EXPORT_SYMBOL(rpcauth_unregister);
+EXPORT_SYMBOL(rpcauth_create);
 EXPORT_SYMBOL(rpcauth_lookupcred);
 EXPORT_SYMBOL(rpcauth_lookup_credcache);
 EXPORT_SYMBOL(rpcauth_free_credcache);
diff -puN net/sunrpc/xprt.c~nfs-16-rpc_clones net/sunrpc/xprt.c
--- 25/net/sunrpc/xprt.c~nfs-16-rpc_clones	2004-01-09 22:16:17.000000000 -0800
+++ 25-akpm/net/sunrpc/xprt.c	2004-01-09 22:16:17.000000000 -0800
@@ -584,9 +584,9 @@ xprt_complete_rqst(struct rpc_xprt *xprt
 		__xprt_put_cong(xprt, req);
 		if (timer) {
 			if (req->rq_ntrans == 1)
-				rpc_update_rtt(&clnt->cl_rtt, timer,
+				rpc_update_rtt(clnt->cl_rtt, timer,
 						(long)jiffies - req->rq_xtime);
-			rpc_set_timeo(&clnt->cl_rtt, timer, req->rq_ntrans - 1);
+			rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1);
 		}
 	}
 
@@ -1224,8 +1224,8 @@ xprt_transmit(struct rpc_task *task)
 	spin_lock_bh(&xprt->sock_lock);
 	if (!xprt->nocong) {
 		int timer = task->tk_msg.rpc_proc->p_timer;
-		task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt, timer);
-		task->tk_timeout <<= rpc_ntimeo(&clnt->cl_rtt, timer);
+		task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer);
+		task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer);
 		task->tk_timeout <<= clnt->cl_timeout.to_retries
 			- req->rq_timeout.to_retries;
 		if (task->tk_timeout > req->rq_timeout.to_maxval)

_