patch-2.4.20 linux-2.4.20/net/sunrpc/xprt.c
Next file: linux-2.4.20/net/unix/af_unix.c
Previous file: linux-2.4.20/net/sunrpc/xdr.c
Back to the patch index
Back to the overall index
- Lines: 1053
- Date:
Thu Nov 28 15:53:16 2002
- Orig file:
linux-2.4.19/net/sunrpc/xprt.c
- Orig date:
Fri Aug 2 17:39:46 2002
diff -urN linux-2.4.19/net/sunrpc/xprt.c linux-2.4.20/net/sunrpc/xprt.c
@@ -67,8 +67,6 @@
#include <asm/uaccess.h>
-extern spinlock_t rpc_queue_lock;
-
/*
* Local variables
*/
@@ -78,6 +76,8 @@
# define RPCDBG_FACILITY RPCDBG_XPRT
#endif
+#define XPRT_MAX_BACKOFF (8)
+
/*
* Local functions
*/
@@ -88,6 +88,7 @@
static void xprt_reconn_status(struct rpc_task *task);
static struct socket *xprt_create_socket(int, struct rpc_timeout *);
static int xprt_bind_socket(struct rpc_xprt *, struct socket *);
+static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
#ifdef RPC_DEBUG_DATA
/*
@@ -130,75 +131,74 @@
}
/*
- * Adjust the iovec to move on 'n' bytes
- */
-
-extern inline void
-xprt_move_iov(struct msghdr *msg, struct iovec *niv, unsigned amount)
-{
- struct iovec *iv=msg->msg_iov;
- int i;
-
- /*
- * Eat any sent iovecs
- */
- while (iv->iov_len <= amount) {
- amount -= iv->iov_len;
- iv++;
- msg->msg_iovlen--;
- }
-
- /*
- * And chew down the partial one
- */
- niv[0].iov_len = iv->iov_len-amount;
- niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;
- iv++;
-
- /*
- * And copy any others
- */
- for(i = 1; i < msg->msg_iovlen; i++)
- niv[i]=*iv++;
-
- msg->msg_iov=niv;
-}
-
-/*
* Serialize write access to sockets, in order to prevent different
* requests from interfering with each other.
* Also prevents TCP socket reconnections from colliding with writes.
*/
static int
-xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
+__xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
{
- int retval;
- spin_lock_bh(&xprt->sock_lock);
- if (!xprt->snd_task)
- xprt->snd_task = task;
- else if (xprt->snd_task != task) {
- dprintk("RPC: %4d TCP write queue full (task %d)\n",
- task->tk_pid, xprt->snd_task->tk_pid);
+ if (!xprt->snd_task) {
+ if (xprt->nocong || __xprt_get_cong(xprt, task))
+ xprt->snd_task = task;
+ }
+ if (xprt->snd_task != task) {
+ dprintk("RPC: %4d TCP write queue full\n", task->tk_pid);
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
- rpc_sleep_on(&xprt->sending, task, NULL, NULL);
+ if (task->tk_rqstp && task->tk_rqstp->rq_nresend)
+ rpc_sleep_on(&xprt->resend, task, NULL, NULL);
+ else
+ rpc_sleep_on(&xprt->sending, task, NULL, NULL);
}
- retval = xprt->snd_task == task;
+ return xprt->snd_task == task;
+}
+
+static inline int
+xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+ int retval;
+ spin_lock_bh(&xprt->sock_lock);
+ retval = __xprt_lock_write(xprt, task);
spin_unlock_bh(&xprt->sock_lock);
return retval;
}
+static void
+__xprt_lock_write_next(struct rpc_xprt *xprt)
+{
+ struct rpc_task *task;
+
+ if (xprt->snd_task)
+ return;
+ task = rpc_wake_up_next(&xprt->resend);
+ if (!task) {
+ if (!xprt->nocong && RPCXPRT_CONGESTED(xprt))
+ return;
+ task = rpc_wake_up_next(&xprt->sending);
+ if (!task)
+ return;
+ }
+ if (xprt->nocong || __xprt_get_cong(xprt, task))
+ xprt->snd_task = task;
+}
+
/*
* Releases the socket for use by other requests.
*/
static void
+__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+ if (xprt->snd_task == task)
+ xprt->snd_task = NULL;
+ __xprt_lock_write_next(xprt);
+}
+
+static inline void
xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
{
spin_lock_bh(&xprt->sock_lock);
- if (xprt->snd_task == task) {
- xprt->snd_task = NULL;
- rpc_wake_up_next(&xprt->sending);
- }
+ __xprt_release_write(xprt, task);
spin_unlock_bh(&xprt->sock_lock);
}
@@ -210,13 +210,11 @@
{
struct socket *sock = xprt->sock;
struct msghdr msg;
+ struct xdr_buf *xdr = &req->rq_snd_buf;
+ struct iovec niv[MAX_IOVEC];
+ unsigned int niov, slen, skip;
mm_segment_t oldfs;
int result;
- int slen = req->rq_slen - req->rq_bytes_sent;
- struct iovec niv[MAX_IOVEC];
-
- if (slen <= 0)
- return 0;
if (!sock)
return -ENOTCONN;
@@ -225,22 +223,26 @@
req->rq_svec->iov_base,
req->rq_svec->iov_len);
+ /* Dont repeat bytes */
+ skip = req->rq_bytes_sent;
+ slen = xdr->len - skip;
+ niov = xdr_kmap(niv, xdr, skip);
+
msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
- msg.msg_iov = req->rq_svec;
- msg.msg_iovlen = req->rq_snr;
+ msg.msg_iov = niv;
+ msg.msg_iovlen = niov;
msg.msg_name = (struct sockaddr *) &xprt->addr;
msg.msg_namelen = sizeof(xprt->addr);
msg.msg_control = NULL;
msg.msg_controllen = 0;
- /* Dont repeat bytes */
- if (req->rq_bytes_sent)
- xprt_move_iov(&msg, niv, req->rq_bytes_sent);
-
oldfs = get_fs(); set_fs(get_ds());
+ clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
result = sock_sendmsg(sock, &msg, slen);
set_fs(oldfs);
+ xdr_kunmap(xdr, skip);
+
dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen, result);
if (result >= 0)
@@ -251,10 +253,7 @@
/* When the server has died, an ICMP port unreachable message
* prompts ECONNREFUSED.
*/
- break;
case -EAGAIN:
- if (test_bit(SOCK_NOSPACE, &sock->flags))
- result = -ENOMEM;
break;
case -ENOTCONN:
case -EPIPE:
@@ -269,6 +268,40 @@
}
/*
+ * Van Jacobson congestion avoidance. Check if the congestion window
+ * overflowed. Put the task to sleep if this is the case.
+ */
+static int
+__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+ struct rpc_rqst *req = task->tk_rqstp;
+
+ if (req->rq_cong)
+ return 1;
+ dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n",
+ task->tk_pid, xprt->cong, xprt->cwnd);
+ if (RPCXPRT_CONGESTED(xprt))
+ return 0;
+ req->rq_cong = 1;
+ xprt->cong += RPC_CWNDSCALE;
+ return 1;
+}
+
+/*
+ * Adjust the congestion window, and wake up the next task
+ * that has been sleeping due to congestion
+ */
+static void
+__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
+{
+ if (!req->rq_cong)
+ return;
+ req->rq_cong = 0;
+ xprt->cong -= RPC_CWNDSCALE;
+ __xprt_lock_write_next(xprt);
+}
+
+/*
* Adjust RPC congestion window
* We use a time-smoothed congestion estimator to avoid heavy oscillation.
*/
@@ -277,40 +310,22 @@
{
unsigned long cwnd;
- if (xprt->nocong)
- return;
- /*
- * Note: we're in a BH context
- */
- spin_lock(&xprt->xprt_lock);
cwnd = xprt->cwnd;
- if (result >= 0) {
- if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime))
- goto out;
+ if (result >= 0 && cwnd <= xprt->cong) {
/* The (cwnd >> 1) term makes sure
* the result gets rounded properly. */
cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
if (cwnd > RPC_MAXCWND)
cwnd = RPC_MAXCWND;
- else
- pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
- xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE;
- dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, "
- "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
- (xprt->congtime-jiffies)*1000/HZ);
+ __xprt_lock_write_next(xprt);
} else if (result == -ETIMEDOUT) {
- if ((cwnd >>= 1) < RPC_CWNDSCALE)
+ cwnd >>= 1;
+ if (cwnd < RPC_CWNDSCALE)
cwnd = RPC_CWNDSCALE;
- xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE;
- dprintk("RPC: cong %ld, cwnd was %ld, now %ld, "
- "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,
- (xprt->congtime-jiffies)*1000/HZ);
- pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);
}
-
+ dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
+ xprt->cong, xprt->cwnd, cwnd);
xprt->cwnd = cwnd;
- out:
- spin_unlock(&xprt->xprt_lock);
}
/*
@@ -370,7 +385,7 @@
sock_release(sock);
/*
- * TCP doesnt require the rpciod now - other things may
+ * TCP doesn't require the rpciod now - other things may
* but rpciod handles that not us.
*/
if(xprt->stream)
@@ -460,9 +475,9 @@
if (inet->state != TCP_ESTABLISHED) {
task->tk_timeout = xprt->timeout.to_maxval;
/* if the socket is already closing, delay 5 secs */
- if ((1<<inet->state) & ~(TCP_SYN_SENT|TCP_SYN_RECV))
+ if ((1<<inet->state) & ~(TCPF_SYN_SENT|TCPF_SYN_RECV))
task->tk_timeout = 5*HZ;
- rpc_sleep_on(&xprt->sending, task, xprt_reconn_status, NULL);
+ rpc_sleep_on(&xprt->pending, task, xprt_reconn_status, NULL);
release_sock(inet);
return;
}
@@ -498,30 +513,16 @@
static inline struct rpc_rqst *
xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid)
{
- struct rpc_task *head, *task;
- struct rpc_rqst *req;
- int safe = 0;
+ struct list_head *pos;
+ struct rpc_rqst *req = NULL;
- spin_lock_bh(&rpc_queue_lock);
- if ((head = xprt->pending.task) != NULL) {
- task = head;
- do {
- if ((req = task->tk_rqstp) && req->rq_xid == xid)
- goto out;
- task = task->tk_next;
- if (++safe > 100) {
- printk("xprt_lookup_rqst: loop in Q!\n");
- goto out_bad;
- }
- } while (task != head);
+ list_for_each(pos, &xprt->recv) {
+ struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
+ if (entry->rq_xid == xid) {
+ req = entry;
+ break;
+ }
}
- dprintk("RPC: unknown XID %08x in reply.\n", xid);
- out_bad:
- req = NULL;
- out:
- if (req && !__rpc_lock_task(req->rq_task))
- req = NULL;
- spin_unlock_bh(&rpc_queue_lock);
return req;
}
@@ -529,13 +530,23 @@
* Complete reply received.
* The TCP code relies on us to remove the request from xprt->pending.
*/
-static inline void
+static void
xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied)
{
struct rpc_task *task = req->rq_task;
+ struct rpc_clnt *clnt = task->tk_client;
/* Adjust congestion window */
- xprt_adjust_cwnd(xprt, copied);
+ if (!xprt->nocong) {
+ xprt_adjust_cwnd(xprt, copied);
+ __xprt_put_cong(xprt, req);
+ if (!req->rq_nresend) {
+ int timer = rpcproc_timer(clnt, task->tk_msg.rpc_proc);
+ if (timer)
+ rpc_update_rtt(&clnt->cl_rtt, timer, (long)jiffies - req->rq_xtime);
+ }
+ rpc_clear_timeo(&clnt->cl_rtt);
+ }
#ifdef RPC_PROFILE
/* Profile only reads for now */
@@ -557,66 +568,68 @@
#endif
dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);
- task->tk_status = copied;
- req->rq_received = 1;
+ req->rq_received = copied;
+ list_del_init(&req->rq_list);
/* ... and wake up the process. */
rpc_wake_up_task(task);
return;
}
+static size_t
+skb_read_bits(skb_reader_t *desc, void *to, size_t len)
+{
+ if (len > desc->count)
+ len = desc->count;
+ skb_copy_bits(desc->skb, desc->offset, to, len);
+ desc->count -= len;
+ desc->offset += len;
+ return len;
+}
+
+static size_t
+skb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len)
+{
+ unsigned int csum2, pos;
+
+ if (len > desc->count)
+ len = desc->count;
+ pos = desc->offset;
+ csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0);
+ desc->csum = csum_block_add(desc->csum, csum2, pos);
+ desc->count -= len;
+ desc->offset += len;
+ return len;
+}
+
/*
* We have set things up such that we perform the checksum of the UDP
* packet in parallel with the copies into the RPC client iovec. -DaveM
*/
-static int csum_partial_copy_to_page_cache(struct iovec *iov,
- struct sk_buff *skb,
- int copied)
-{
- int offset = sizeof(struct udphdr);
- __u8 *cur_ptr = iov->iov_base;
- __kernel_size_t cur_len = iov->iov_len;
- unsigned int csum = skb->csum;
- int need_csum = (skb->ip_summed != CHECKSUM_UNNECESSARY);
- int slack = skb->len - copied - sizeof(struct udphdr);
-
- if (need_csum)
- csum = csum_partial(skb->data, sizeof(struct udphdr), csum);
- while (copied > 0) {
- if (cur_len) {
- int to_move = cur_len;
- if (to_move > copied)
- to_move = copied;
- if (need_csum) {
- unsigned int csum2;
-
- csum2 = skb_copy_and_csum_bits(skb, offset,
- cur_ptr,
- to_move, 0);
- csum = csum_block_add(csum, csum2, offset);
- } else
- skb_copy_bits(skb, offset, cur_ptr, to_move);
- offset += to_move;
- copied -= to_move;
- cur_ptr += to_move;
- cur_len -= to_move;
- }
- if (cur_len <= 0) {
- iov++;
- cur_len = iov->iov_len;
- cur_ptr = iov->iov_base;
- }
- }
- if (need_csum) {
- if (slack > 0) {
- unsigned int csum2;
+static int
+csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
+{
+ skb_reader_t desc;
- csum2 = skb_checksum(skb, offset, slack, 0);
- csum = csum_block_add(csum, csum2, offset);
- }
- if ((unsigned short)csum_fold(csum))
- return -1;
+ desc.skb = skb;
+ desc.offset = sizeof(struct udphdr);
+ desc.count = skb->len - desc.offset;
+
+ if (skb->ip_summed == CHECKSUM_UNNECESSARY)
+ goto no_checksum;
+
+ desc.csum = csum_partial(skb->data, desc.offset, skb->csum);
+ xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits);
+ if (desc.offset != skb->len) {
+ unsigned int csum2;
+ csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0);
+ desc.csum = csum_block_add(desc.csum, csum2, desc.offset);
}
+ if ((unsigned short)csum_fold(desc.csum))
+ return -1;
+ return 0;
+no_checksum:
+ xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits);
return 0;
}
@@ -654,9 +667,10 @@
}
/* Look up and lock the request corresponding to the given XID */
+ spin_lock(&xprt->sock_lock);
rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));
if (!rovr)
- goto dropit;
+ goto out_unlock;
task = rovr->rq_task;
dprintk("RPC: %4d received reply\n", task->tk_pid);
@@ -667,7 +681,7 @@
copied = repsize;
/* Suck it into the iovec, verify checksum if not done by hw. */
- if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied))
+ if (csum_partial_copy_to_xdr(&rovr->rq_rcv_buf, skb))
goto out_unlock;
/* Something worked... */
@@ -676,8 +690,7 @@
xprt_complete_rqst(xprt, rovr, copied);
out_unlock:
- rpc_unlock_task(task);
-
+ spin_unlock(&xprt->sock_lock);
dropit:
skb_free_datagram(sk, skb);
out:
@@ -685,12 +698,6 @@
wake_up_interruptible(sk->sleep);
}
-typedef struct {
- struct sk_buff *skb;
- unsigned offset;
- size_t count;
-} skb_reader_t;
-
/*
* Copy from an skb into memory and shrink the skb.
*/
@@ -781,50 +788,43 @@
tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
{
struct rpc_rqst *req;
- struct iovec *iov;
- char *p;
- unsigned long skip;
- size_t len, used;
- int n;
+ struct xdr_buf *rcvbuf;
+ size_t len;
/* Find and lock the request corresponding to this xid */
+ spin_lock(&xprt->sock_lock);
req = xprt_lookup_rqst(xprt, xprt->tcp_xid);
if (!req) {
xprt->tcp_flags &= ~XPRT_COPY_DATA;
dprintk("RPC: XID %08x request not found!\n",
xprt->tcp_xid);
+ spin_unlock(&xprt->sock_lock);
return;
}
- skip = xprt->tcp_copied;
- iov = req->rq_rvec;
- for (n = req->rq_rnr; n != 0; n--, iov++) {
- if (skip >= iov->iov_len) {
- skip -= iov->iov_len;
- continue;
- }
- p = iov->iov_base;
- len = iov->iov_len;
- if (skip) {
- p += skip;
- len -= skip;
- skip = 0;
- }
- if (xprt->tcp_offset + len > xprt->tcp_reclen)
- len = xprt->tcp_reclen - xprt->tcp_offset;
- used = tcp_copy_data(desc, p, len);
- xprt->tcp_copied += used;
- xprt->tcp_offset += used;
- if (used != len)
- break;
- if (xprt->tcp_copied == req->rq_rlen) {
+
+ rcvbuf = &req->rq_rcv_buf;
+ len = desc->count;
+ if (len > xprt->tcp_reclen - xprt->tcp_offset) {
+ skb_reader_t my_desc;
+
+ len = xprt->tcp_reclen - xprt->tcp_offset;
+ memcpy(&my_desc, desc, sizeof(my_desc));
+ my_desc.count = len;
+ xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
+ &my_desc, tcp_copy_data);
+ desc->count -= len;
+ desc->offset += len;
+ } else
+ xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,
+ desc, tcp_copy_data);
+ xprt->tcp_copied += len;
+ xprt->tcp_offset += len;
+
+ if (xprt->tcp_copied == req->rq_rlen)
+ xprt->tcp_flags &= ~XPRT_COPY_DATA;
+ else if (xprt->tcp_offset == xprt->tcp_reclen) {
+ if (xprt->tcp_flags & XPRT_LAST_FRAG)
xprt->tcp_flags &= ~XPRT_COPY_DATA;
- break;
- }
- if (xprt->tcp_offset == xprt->tcp_reclen) {
- if (xprt->tcp_flags & XPRT_LAST_FRAG)
- xprt->tcp_flags &= ~XPRT_COPY_DATA;
- break;
- }
}
if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {
@@ -832,7 +832,7 @@
req->rq_task->tk_pid);
xprt_complete_rqst(xprt, req, xprt->tcp_copied);
}
- rpc_unlock_task(req->rq_task);
+ spin_unlock(&xprt->sock_lock);
tcp_check_recm(xprt);
}
@@ -932,7 +932,7 @@
xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;
spin_lock(&xprt->sock_lock);
- if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
+ if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
rpc_wake_up_task(xprt->snd_task);
spin_unlock(&xprt->sock_lock);
break;
@@ -949,8 +949,10 @@
}
/*
- * The following 2 routines allow a task to sleep while socket memory is
- * low.
+ * Called when more output buffer space is available for this socket.
+ * We try not to wake our writers until they can make "significant"
+ * progress, otherwise we'll waste resources thrashing sock_sendmsg
+ * with a bunch of small requests.
*/
static void
xprt_write_space(struct sock *sk)
@@ -964,22 +966,40 @@
return;
/* Wait until we have enough socket memory */
- if (!sock_writeable(sk))
+ if (xprt->stream) {
+ /* from net/ipv4/tcp.c:tcp_write_space */
+ if (tcp_wspace(sk) < tcp_min_write_space(sk))
+ return;
+ } else {
+ /* from net/core/sock.c:sock_def_write_space */
+ if (!sock_writeable(sk))
+ return;
+ }
+
+ if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
return;
- if (!xprt_test_and_set_wspace(xprt)) {
- spin_lock(&xprt->sock_lock);
- if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)
- rpc_wake_up_task(xprt->snd_task);
- spin_unlock(&xprt->sock_lock);
- }
+ spin_lock_bh(&xprt->sock_lock);
+ if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
+ rpc_wake_up_task(xprt->snd_task);
+ spin_unlock_bh(&xprt->sock_lock);
+ if (sk->sleep && waitqueue_active(sk->sleep))
+ wake_up_interruptible(sk->sleep);
+}
- if (test_bit(SOCK_NOSPACE, &sock->flags)) {
- if (sk->sleep && waitqueue_active(sk->sleep)) {
- clear_bit(SOCK_NOSPACE, &sock->flags);
- wake_up_interruptible(sk->sleep);
- }
- }
+/*
+ * Exponential backoff for UDP retries
+ */
+static inline int
+xprt_expbackoff(struct rpc_task *task, struct rpc_rqst *req)
+{
+ int backoff;
+
+ req->rq_ntimeo++;
+ backoff = min(rpc_ntimeo(&task->tk_client->cl_rtt), XPRT_MAX_BACKOFF);
+ if (req->rq_ntimeo < (1 << backoff))
+ return 1;
+ return 0;
}
/*
@@ -989,16 +1009,31 @@
xprt_timer(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
+ struct rpc_xprt *xprt = req->rq_xprt;
- if (req)
- xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);
+ spin_lock(&xprt->sock_lock);
+ if (req->rq_received)
+ goto out;
+
+ if (!xprt->nocong) {
+ if (xprt_expbackoff(task, req)) {
+ rpc_add_timer(task, xprt_timer);
+ goto out_unlock;
+ }
+ rpc_inc_timeo(&task->tk_client->cl_rtt);
+ xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);
+ }
+ req->rq_nresend++;
dprintk("RPC: %4d xprt_timer (%s request)\n",
task->tk_pid, req ? "pending" : "backlogged");
task->tk_status = -ETIMEDOUT;
+out:
task->tk_timeout = 0;
rpc_wake_up_task(task);
+out_unlock:
+ spin_unlock(&xprt->sock_lock);
}
/*
@@ -1034,37 +1069,35 @@
*marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));
}
- if (!xprt_lock_write(xprt, task))
+ spin_lock_bh(&xprt->sock_lock);
+ if (!__xprt_lock_write(xprt, task)) {
+ spin_unlock_bh(&xprt->sock_lock);
return;
+ }
+ if (list_empty(&req->rq_list)) {
+ list_add_tail(&req->rq_list, &xprt->recv);
+ req->rq_received = 0;
+ }
+ spin_unlock_bh(&xprt->sock_lock);
-#ifdef RPC_PROFILE
- req->rq_xtime = jiffies;
-#endif
do_xprt_transmit(task);
}
static void
do_xprt_transmit(struct rpc_task *task)
{
+ struct rpc_clnt *clnt = task->tk_client;
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
int status, retry = 0;
- /* For fast networks/servers we have to put the request on
- * the pending list now:
- * Note that we don't want the task timing out during the
- * call to xprt_sendmsg(), so we initially disable the timeout,
- * and then reset it later...
- */
- xprt_receive(task);
-
/* Continue transmitting the packet/record. We must be careful
* to cope with writespace callbacks arriving _after_ we have
* called xprt_sendmsg().
*/
while (1) {
- xprt_clear_wspace(xprt);
+ req->rq_xtime = jiffies;
status = xprt_sendmsg(xprt, req);
if (status < 0)
@@ -1078,7 +1111,7 @@
} else {
if (status >= req->rq_slen)
goto out_receive;
- status = -ENOMEM;
+ status = -EAGAIN;
break;
}
@@ -1090,31 +1123,28 @@
if (retry++ > 50)
break;
}
- rpc_unlock_task(task);
/* Note: at this point, task->tk_sleeping has not yet been set,
* hence there is no danger of the waking up task being put on
* schedq, and being picked up by a parallel run of rpciod().
*/
- rpc_wake_up_task(task);
- if (!RPC_IS_RUNNING(task))
- goto out_release;
if (req->rq_received)
goto out_release;
task->tk_status = status;
switch (status) {
- case -ENOMEM:
- /* Protect against (udp|tcp)_write_space */
- spin_lock_bh(&xprt->sock_lock);
- if (!xprt_wspace(xprt)) {
- task->tk_timeout = req->rq_timeout.to_current;
- rpc_sleep_on(&xprt->sending, task, NULL, NULL);
- }
- spin_unlock_bh(&xprt->sock_lock);
- return;
case -EAGAIN:
+ if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
+ /* Protect against races with xprt_write_space */
+ spin_lock_bh(&xprt->sock_lock);
+ if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
+ task->tk_timeout = req->rq_timeout.to_current;
+ rpc_sleep_on(&xprt->pending, task, NULL, NULL);
+ }
+ spin_unlock_bh(&xprt->sock_lock);
+ return;
+ }
/* Keep holding the socket if it is blocked */
rpc_delay(task, HZ>>4);
return;
@@ -1126,35 +1156,26 @@
if (xprt->stream)
xprt_disconnect(xprt);
req->rq_bytes_sent = 0;
- goto out_release;
}
-
+ out_release:
+ xprt_release_write(xprt, task);
+ return;
out_receive:
dprintk("RPC: %4d xmit complete\n", task->tk_pid);
/* Set the task's receive timeout value */
- task->tk_timeout = req->rq_timeout.to_current;
- rpc_add_timer(task, xprt_timer);
- rpc_unlock_task(task);
- out_release:
- xprt_release_write(xprt, task);
-}
-
-/*
- * Queue the task for a reply to our call.
- * When the callback is invoked, the congestion window should have
- * been updated already.
- */
-void
-xprt_receive(struct rpc_task *task)
-{
- struct rpc_rqst *req = task->tk_rqstp;
- struct rpc_xprt *xprt = req->rq_xprt;
-
- dprintk("RPC: %4d xprt_receive\n", task->tk_pid);
-
- req->rq_received = 0;
- task->tk_timeout = 0;
- rpc_sleep_locked(&xprt->pending, task, NULL, NULL);
+ if (!xprt->nocong) {
+ task->tk_timeout = rpc_calc_rto(&clnt->cl_rtt,
+ rpcproc_timer(clnt, task->tk_msg.rpc_proc));
+ req->rq_ntimeo = 0;
+ if (task->tk_timeout > req->rq_timeout.to_maxval)
+ task->tk_timeout = req->rq_timeout.to_maxval;
+ } else
+ task->tk_timeout = req->rq_timeout.to_current;
+ spin_lock_bh(&xprt->sock_lock);
+ if (!req->rq_received)
+ rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);
+ __xprt_release_write(xprt, task);
+ spin_unlock_bh(&xprt->sock_lock);
}
/*
@@ -1169,9 +1190,7 @@
if (task->tk_rqstp)
return 0;
- dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",
- task->tk_pid, xprt->cong, xprt->cwnd);
- spin_lock_bh(&xprt->xprt_lock);
+ spin_lock(&xprt->xprt_lock);
xprt_reserve_status(task);
if (task->tk_rqstp) {
task->tk_timeout = 0;
@@ -1182,7 +1201,7 @@
task->tk_status = -EAGAIN;
rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
}
- spin_unlock_bh(&xprt->xprt_lock);
+ spin_unlock(&xprt->xprt_lock);
dprintk("RPC: %4d xprt_reserve returns %d\n",
task->tk_pid, task->tk_status);
return task->tk_status;
@@ -1204,18 +1223,13 @@
} else if (task->tk_rqstp) {
/* We've already been given a request slot: NOP */
} else {
- if (RPCXPRT_CONGESTED(xprt) || !(req = xprt->free))
+ if (!(req = xprt->free))
goto out_nofree;
- /* OK: There's room for us. Grab a free slot and bump
- * congestion value */
+ /* OK: There's room for us. Grab a free slot */
xprt->free = req->rq_next;
req->rq_next = NULL;
- xprt->cong += RPC_CWNDSCALE;
task->tk_rqstp = req;
xprt_request_init(task, xprt);
-
- if (xprt->free)
- xprt_clear_backlog(xprt);
}
return;
@@ -1244,6 +1258,7 @@
req->rq_xid = xid++;
if (!xid)
xid++;
+ INIT_LIST_HEAD(&req->rq_list);
}
/*
@@ -1255,27 +1270,25 @@
struct rpc_xprt *xprt = task->tk_xprt;
struct rpc_rqst *req;
- if (xprt->snd_task == task) {
- if (xprt->stream)
- xprt_disconnect(xprt);
- xprt_release_write(xprt, task);
- }
if (!(req = task->tk_rqstp))
return;
+ spin_lock_bh(&xprt->sock_lock);
+ __xprt_release_write(xprt, task);
+ __xprt_put_cong(xprt, req);
+ if (!list_empty(&req->rq_list))
+ list_del(&req->rq_list);
+ spin_unlock_bh(&xprt->sock_lock);
task->tk_rqstp = NULL;
memset(req, 0, sizeof(*req)); /* mark unused */
dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
- spin_lock_bh(&xprt->xprt_lock);
+ spin_lock(&xprt->xprt_lock);
req->rq_next = xprt->free;
xprt->free = req;
- /* Decrease congestion value. */
- xprt->cong -= RPC_CWNDSCALE;
-
xprt_clear_backlog(xprt);
- spin_unlock_bh(&xprt->xprt_lock);
+ spin_unlock(&xprt->xprt_lock);
}
/*
@@ -1331,11 +1344,12 @@
xprt->nocong = 1;
} else
xprt->cwnd = RPC_INITCWND;
- xprt->congtime = jiffies;
spin_lock_init(&xprt->sock_lock);
spin_lock_init(&xprt->xprt_lock);
init_waitqueue_head(&xprt->cong_wait);
+ INIT_LIST_HEAD(&xprt->recv);
+
/* Set timeout parameters */
if (to) {
xprt->timeout = *to;
@@ -1344,9 +1358,10 @@
} else
xprt_default_timeout(&xprt->timeout, xprt->prot);
- xprt->pending = RPC_INIT_WAITQ("xprt_pending");
- xprt->sending = RPC_INIT_WAITQ("xprt_sending");
- xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");
+ INIT_RPC_WAITQ(&xprt->pending, "xprt_pending");
+ INIT_RPC_WAITQ(&xprt->sending, "xprt_sending");
+ INIT_RPC_WAITQ(&xprt->resend, "xprt_resend");
+ INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
/* initialize free list */
for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)
@@ -1420,6 +1435,27 @@
}
/*
+ * Set socket buffer length
+ */
+void
+xprt_sock_setbufsize(struct rpc_xprt *xprt)
+{
+ struct sock *sk = xprt->inet;
+
+ if (xprt->stream)
+ return;
+ if (xprt->rcvsize) {
+ sk->userlocks |= SOCK_RCVBUF_LOCK;
+ sk->rcvbuf = xprt->rcvsize * RPC_MAXCONG * 2;
+ }
+ if (xprt->sndsize) {
+ sk->userlocks |= SOCK_SNDBUF_LOCK;
+ sk->sndbuf = xprt->sndsize * RPC_MAXCONG * 2;
+ sk->write_space(sk);
+ }
+}
+
+/*
* Create a client socket given the protocol and peer address.
*/
static struct socket *
@@ -1477,6 +1513,7 @@
{
xprt->shutdown = 1;
rpc_wake_up(&xprt->sending);
+ rpc_wake_up(&xprt->resend);
rpc_wake_up(&xprt->pending);
rpc_wake_up(&xprt->backlog);
if (waitqueue_active(&xprt->cong_wait))
@@ -1488,8 +1525,6 @@
*/
int
xprt_clear_backlog(struct rpc_xprt *xprt) {
- if (RPCXPRT_CONGESTED(xprt))
- return 0;
rpc_wake_up_next(&xprt->backlog);
if (waitqueue_active(&xprt->cong_wait))
wake_up(&xprt->cong_wait);
FUNET's LINUX-ADM group, linux-adm@nic.funet.fi
TCL-scripts by Sam Shen (who was at: slshen@lbl.gov)