static void xprt_bind_socket(struct rpc_xprt *, struct socket *);
static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
+static int xprt_clear_backlog(struct rpc_xprt *xprt);
+
#ifdef RPC_DEBUG_DATA
/*
* Print the buffer contents (first 128 bytes only--just enough for
xprt->cwnd = cwnd;
}
+/*
+ * Reset the major timeout value
+ */
+static void xprt_reset_majortimeo(struct rpc_rqst *req)
+{
+ struct rpc_timeout *to = &req->rq_xprt->timeout;
+
+ req->rq_majortimeo = req->rq_timeout;
+ if (to->to_exponential)
+ req->rq_majortimeo <<= to->to_retries;
+ else
+ req->rq_majortimeo += to->to_increment * to->to_retries;
+ if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
+ req->rq_majortimeo = to->to_maxval;
+ req->rq_majortimeo += jiffies;
+}
+
/*
* Adjust timeout values etc for next retransmit
*/
-int
-xprt_adjust_timeout(struct rpc_timeout *to)
+int xprt_adjust_timeout(struct rpc_rqst *req)
{
- if (to->to_retries > 0) {
+ struct rpc_xprt *xprt = req->rq_xprt;
+ struct rpc_timeout *to = &xprt->timeout;
+ int status = 0;
+
+ if (time_before(jiffies, req->rq_majortimeo)) {
if (to->to_exponential)
- to->to_current <<= 1;
+ req->rq_timeout <<= 1;
else
- to->to_current += to->to_increment;
- if (to->to_maxval && to->to_current >= to->to_maxval)
- to->to_current = to->to_maxval;
+ req->rq_timeout += to->to_increment;
+ if (to->to_maxval && req->rq_timeout >= to->to_maxval)
+ req->rq_timeout = to->to_maxval;
+ req->rq_retries++;
+ pprintk("RPC: %lu retrans\n", jiffies);
} else {
- if (to->to_exponential)
- to->to_initval <<= 1;
- else
- to->to_initval += to->to_increment;
- if (to->to_maxval && to->to_initval >= to->to_maxval)
- to->to_initval = to->to_maxval;
- to->to_current = to->to_initval;
+ req->rq_timeout = to->to_initval;
+ req->rq_retries = 0;
+ xprt_reset_majortimeo(req);
+ /* Reset the RTT counters == "slow start" */
+ spin_lock_bh(&xprt->sock_lock);
+ rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
+ spin_unlock_bh(&xprt->sock_lock);
+ pprintk("RPC: %lu timeout\n", jiffies);
+ status = -ETIMEDOUT;
}
- if (!to->to_current) {
- printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
- to->to_current = 5 * HZ;
+ if (req->rq_timeout == 0) {
+ printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
+ req->rq_timeout = 5 * HZ;
}
- pprintk("RPC: %lu %s\n", jiffies,
- to->to_retries? "retrans" : "timeout");
- return to->to_retries-- > 0;
+ return status;
}
/*
task->tk_timeout = RPC_CONNECT_TIMEOUT;
rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL);
- if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate))
- schedule_work(&xprt->sock_connect);
+ if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) {
+ /* Note: if we are here due to a dropped connection
+ * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ
+ * seconds
+ */
+ if (xprt->sock != NULL)
+ schedule_delayed_work(&xprt->sock_connect,
+ RPC_REESTABLISH_TIMEOUT);
+ else
+ schedule_work(&xprt->sock_connect);
+ }
return;
out_write:
xprt_release_write(xprt, task);
case -ECONNREFUSED:
case -ECONNRESET:
case -ENOTCONN:
- rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
return;
case -ETIMEDOUT:
dprintk("RPC: %4d xprt_connect_status: timed out\n",
struct rpc_rqst *rovr;
struct sk_buff *skb;
int err, repsize, copied;
- u32 xid;
+ u32 _xid, *xp;
read_lock(&sk->sk_callback_lock);
dprintk("RPC: udp_data_ready...\n");
}
/* Copy the XID from the skb... */
- if (skb_copy_bits(skb, sizeof(struct udphdr), &xid, sizeof(xid)) < 0)
+ xp = skb_header_pointer(skb, sizeof(struct udphdr),
+ sizeof(_xid), &_xid);
+ if (xp == NULL)
goto dropit;
/* Look up and lock the request corresponding to the given XID */
spin_lock(&xprt->sock_lock);
- rovr = xprt_lookup_rqst(xprt, xid);
+ rovr = xprt_lookup_rqst(xprt, *xp);
if (!rovr)
goto out_unlock;
task = rovr->rq_task;
xprt->tcp_flags &= ~XPRT_COPY_XID;
xprt->tcp_flags |= XPRT_COPY_DATA;
xprt->tcp_copied = 4;
- dprintk("RPC: reading reply for XID %08x\n", xprt->tcp_xid);
+ dprintk("RPC: reading reply for XID %08x\n",
+ ntohl(xprt->tcp_xid));
tcp_check_recm(xprt);
}
if (!req) {
xprt->tcp_flags &= ~XPRT_COPY_DATA;
dprintk("RPC: XID %08x request not found!\n",
- xprt->tcp_xid);
+ ntohl(xprt->tcp_xid));
spin_unlock(&xprt->sock_lock);
return;
}
tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
unsigned int offset, size_t len)
{
- struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
+ struct rpc_xprt *xprt = rd_desc->arg.data;
skb_reader_t desc = {
.skb = skb,
.offset = offset,
goto out;
/* We use rd_desc to pass struct xprt to tcp_data_recv */
- rd_desc.buf = (char *)xprt;
+ rd_desc.arg.data = xprt;
rd_desc.count = 65536;
tcp_read_sock(sk, &rd_desc, tcp_data_recv);
out:
dprintk("RPC: tcp_state_change client %p...\n", xprt);
dprintk("RPC: state %x conn %d dead %d zapped %d\n",
sk->sk_state, xprt_connected(xprt),
- sock_flag(sk, SOCK_DEAD), sk->sk_zapped);
+ sock_flag(sk, SOCK_DEAD),
+ sock_flag(sk, SOCK_ZAPPED));
switch (sk->sk_state) {
case TCP_ESTABLISHED:
/* Wait until we have enough socket memory */
if (xprt->stream) {
- /* from net/ipv4/tcp.c:tcp_write_space */
- if (tcp_wspace(sk) < tcp_min_write_space(sk))
+ /* from net/core/stream.c:sk_stream_write_space */
+ if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk))
goto out;
} else {
/* from net/core/sock.c:sock_def_write_space */
goto out;
spin_lock_bh(&xprt->sock_lock);
- if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
+ if (xprt->snd_task)
rpc_wake_up_task(xprt->snd_task);
spin_unlock_bh(&xprt->sock_lock);
out:
/* Add request to the receive list */
list_add_tail(&req->rq_list, &xprt->recv);
spin_unlock_bh(&xprt->sock_lock);
+ xprt_reset_majortimeo(req);
}
} else if (!req->rq_bytes_sent)
return;
if (!xprt_connected(xprt))
task->tk_status = -ENOTCONN;
else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
- task->tk_timeout = req->rq_timeout.to_current;
+ task->tk_timeout = req->rq_timeout;
rpc_sleep_on(&xprt->pending, task, NULL, NULL);
}
spin_unlock_bh(&xprt->sock_lock);
if (!xprt->nocong) {
int timer = task->tk_msg.rpc_proc->p_timer;
task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer);
- task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer);
- task->tk_timeout <<= clnt->cl_timeout.to_retries
- - req->rq_timeout.to_retries;
- if (task->tk_timeout > req->rq_timeout.to_maxval)
- task->tk_timeout = req->rq_timeout.to_maxval;
+ task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries;
+ if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0)
+ task->tk_timeout = xprt->timeout.to_maxval;
} else
- task->tk_timeout = req->rq_timeout.to_current;
+ task->tk_timeout = req->rq_timeout;
/* Don't race with disconnect */
if (!xprt_connected(xprt))
task->tk_status = -ENOTCONN;
/*
* Reserve an RPC call slot.
*/
-void
-xprt_reserve(struct rpc_task *task)
-{
- struct rpc_xprt *xprt = task->tk_xprt;
-
- task->tk_status = -EIO;
- if (!xprt->shutdown) {
- spin_lock(&xprt->xprt_lock);
- do_xprt_reserve(task);
- spin_unlock(&xprt->xprt_lock);
- if (task->tk_rqstp)
- del_timer_sync(&xprt->timer);
- }
-}
-
static inline void
do_xprt_reserve(struct rpc_task *task)
{
rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
}
+void
+xprt_reserve(struct rpc_task *task)
+{
+ struct rpc_xprt *xprt = task->tk_xprt;
+
+ task->tk_status = -EIO;
+ if (!xprt->shutdown) {
+ spin_lock(&xprt->xprt_lock);
+ do_xprt_reserve(task);
+ spin_unlock(&xprt->xprt_lock);
+ if (task->tk_rqstp)
+ del_timer_sync(&xprt->timer);
+ }
+}
+
/*
* Allocate a 'unique' XID
*/
{
struct rpc_rqst *req = task->tk_rqstp;
- req->rq_timeout = xprt->timeout;
+ req->rq_timeout = xprt->timeout.to_initval;
req->rq_task = task;
req->rq_xprt = xprt;
req->rq_xid = xprt_alloc_xid(xprt);
dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
- req, req->rq_xid);
+ req, ntohl(req->rq_xid));
}
/*
/*
* Set default timeout parameters
*/
-void
+static void
xprt_default_timeout(struct rpc_timeout *to, int proto)
{
if (proto == IPPROTO_UDP)
void
xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
{
- to->to_current =
to->to_initval =
to->to_increment = incr;
to->to_maxval = incr * retr;
if (xprt->stream) {
xprt->cwnd = RPC_MAXCWND(xprt);
xprt->nocong = 1;
- } else
+ xprt->max_payload = (1U << 31) - 1;
+ } else {
xprt->cwnd = RPC_INITCWND;
+ xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
+ }
spin_lock_init(&xprt->sock_lock);
spin_lock_init(&xprt->xprt_lock);
init_waitqueue_head(&xprt->cong_wait);
/* Set timeout parameters */
if (to) {
xprt->timeout = *to;
- xprt->timeout.to_current = to->to_initval;
} else
xprt_default_timeout(&xprt->timeout, xprt->prot);
sk->sk_no_check = UDP_CSUM_NORCV;
xprt_set_connected(xprt);
} else {
- struct tcp_opt *tp = tcp_sk(sk);
- tp->nonagle = 1; /* disable Nagle's algorithm */
+ tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */
sk->sk_data_ready = tcp_data_ready;
sk->sk_state_change = tcp_state_change;
xprt_clear_connected(xprt);
/*
* Prepare for transport shutdown.
*/
-void
+static void
xprt_shutdown(struct rpc_xprt *xprt)
{
xprt->shutdown = 1;
/*
* Clear the xprt backlog queue
*/
-int
+static int
xprt_clear_backlog(struct rpc_xprt *xprt) {
rpc_wake_up_next(&xprt->backlog);
wake_up(&xprt->cong_wait);