vserver 2.0 rc7
[linux-2.6.git] / net / sunrpc / xprt.c
index f02f9e2..c74a6bb 100644 (file)
@@ -90,6 +90,8 @@ static struct socket *xprt_create_socket(struct rpc_xprt *, int, int);
 static void    xprt_bind_socket(struct rpc_xprt *, struct socket *);
 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
 
+static int     xprt_clear_backlog(struct rpc_xprt *xprt);
+
 #ifdef RPC_DEBUG_DATA
 /*
  * Print the buffer contents (first 128 bytes only--just enough for
@@ -351,36 +353,58 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
        xprt->cwnd = cwnd;
 }
 
+/*
+ * Reset the major timeout value
+ */
+static void xprt_reset_majortimeo(struct rpc_rqst *req)
+{
+       struct rpc_timeout *to = &req->rq_xprt->timeout;
+
+       req->rq_majortimeo = req->rq_timeout;
+       if (to->to_exponential)
+               req->rq_majortimeo <<= to->to_retries;
+       else
+               req->rq_majortimeo += to->to_increment * to->to_retries;
+       if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
+               req->rq_majortimeo = to->to_maxval;
+       req->rq_majortimeo += jiffies;
+}
+
 /*
  * Adjust timeout values etc for next retransmit
  */
-int
-xprt_adjust_timeout(struct rpc_timeout *to)
+int xprt_adjust_timeout(struct rpc_rqst *req)
 {
-       if (to->to_retries > 0) {
+       struct rpc_xprt *xprt = req->rq_xprt;
+       struct rpc_timeout *to = &xprt->timeout;
+       int status = 0;
+
+       if (time_before(jiffies, req->rq_majortimeo)) {
                if (to->to_exponential)
-                       to->to_current <<= 1;
+                       req->rq_timeout <<= 1;
                else
-                       to->to_current += to->to_increment;
-               if (to->to_maxval && to->to_current >= to->to_maxval)
-                       to->to_current = to->to_maxval;
+                       req->rq_timeout += to->to_increment;
+               if (to->to_maxval && req->rq_timeout >= to->to_maxval)
+                       req->rq_timeout = to->to_maxval;
+               req->rq_retries++;
+               pprintk("RPC: %lu retrans\n", jiffies);
        } else {
-               if (to->to_exponential)
-                       to->to_initval <<= 1;
-               else
-                       to->to_initval += to->to_increment;
-               if (to->to_maxval && to->to_initval >= to->to_maxval)
-                       to->to_initval = to->to_maxval;
-               to->to_current = to->to_initval;
+               req->rq_timeout = to->to_initval;
+               req->rq_retries = 0;
+               xprt_reset_majortimeo(req);
+               /* Reset the RTT counters == "slow start" */
+               spin_lock_bh(&xprt->sock_lock);
+               rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
+               spin_unlock_bh(&xprt->sock_lock);
+               pprintk("RPC: %lu timeout\n", jiffies);
+               status = -ETIMEDOUT;
        }
 
-       if (!to->to_current) {
-               printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");
-               to->to_current = 5 * HZ;
+       if (req->rq_timeout == 0) {
+               printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
+               req->rq_timeout = 5 * HZ;
        }
-       pprintk("RPC: %lu %s\n", jiffies,
-                       to->to_retries? "retrans" : "timeout");
-       return to->to_retries-- > 0;
+       return status;
 }
 
 /*
@@ -537,8 +561,17 @@ void xprt_connect(struct rpc_task *task)
 
        task->tk_timeout = RPC_CONNECT_TIMEOUT;
        rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL);
-       if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate))
-               schedule_work(&xprt->sock_connect);
+       if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) {
+               /* Note: if we are here due to a dropped connection
+                *       we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ
+                *       seconds
+                */
+               if (xprt->sock != NULL)
+                       schedule_delayed_work(&xprt->sock_connect,
+                                       RPC_REESTABLISH_TIMEOUT);
+               else
+                       schedule_work(&xprt->sock_connect);
+       }
        return;
  out_write:
        xprt_release_write(xprt, task);
@@ -566,7 +599,6 @@ xprt_connect_status(struct rpc_task *task)
        case -ECONNREFUSED:
        case -ECONNRESET:
        case -ENOTCONN:
-               rpc_delay(task, RPC_REESTABLISH_TIMEOUT);
                return;
        case -ETIMEDOUT:
                dprintk("RPC: %4d xprt_connect_status: timed out\n",
@@ -723,7 +755,7 @@ udp_data_ready(struct sock *sk, int len)
        struct rpc_rqst *rovr;
        struct sk_buff  *skb;
        int err, repsize, copied;
-       u32 xid;
+       u32 _xid, *xp;
 
        read_lock(&sk->sk_callback_lock);
        dprintk("RPC:      udp_data_ready...\n");
@@ -747,12 +779,14 @@ udp_data_ready(struct sock *sk, int len)
        }
 
        /* Copy the XID from the skb... */
-       if (skb_copy_bits(skb, sizeof(struct udphdr), &xid, sizeof(xid)) < 0)
+       xp = skb_header_pointer(skb, sizeof(struct udphdr),
+                               sizeof(_xid), &_xid);
+       if (xp == NULL)
                goto dropit;
 
        /* Look up and lock the request corresponding to the given XID */
        spin_lock(&xprt->sock_lock);
-       rovr = xprt_lookup_rqst(xprt, xid);
+       rovr = xprt_lookup_rqst(xprt, *xp);
        if (!rovr)
                goto out_unlock;
        task = rovr->rq_task;
@@ -859,7 +893,8 @@ tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc)
        xprt->tcp_flags &= ~XPRT_COPY_XID;
        xprt->tcp_flags |= XPRT_COPY_DATA;
        xprt->tcp_copied = 4;
-       dprintk("RPC:      reading reply for XID %08x\n", xprt->tcp_xid);
+       dprintk("RPC:      reading reply for XID %08x\n",
+                                               ntohl(xprt->tcp_xid));
        tcp_check_recm(xprt);
 }
 
@@ -879,7 +914,7 @@ tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc)
        if (!req) {
                xprt->tcp_flags &= ~XPRT_COPY_DATA;
                dprintk("RPC:      XID %08x request not found!\n",
-                               xprt->tcp_xid);
+                               ntohl(xprt->tcp_xid));
                spin_unlock(&xprt->sock_lock);
                return;
        }
@@ -943,7 +978,7 @@ static int
 tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,
                unsigned int offset, size_t len)
 {
-       struct rpc_xprt *xprt = (struct rpc_xprt *)rd_desc->buf;
+       struct rpc_xprt *xprt = rd_desc->arg.data;
        skb_reader_t desc = {
                .skb    = skb,
                .offset = offset,
@@ -991,7 +1026,7 @@ static void tcp_data_ready(struct sock *sk, int bytes)
                goto out;
 
        /* We use rd_desc to pass struct xprt to tcp_data_recv */
-       rd_desc.buf = (char *)xprt;
+       rd_desc.arg.data = xprt;
        rd_desc.count = 65536;
        tcp_read_sock(sk, &rd_desc, tcp_data_recv);
 out:
@@ -1009,7 +1044,8 @@ tcp_state_change(struct sock *sk)
        dprintk("RPC:      tcp_state_change client %p...\n", xprt);
        dprintk("RPC:      state %x conn %d dead %d zapped %d\n",
                                sk->sk_state, xprt_connected(xprt),
-                               sock_flag(sk, SOCK_DEAD), sk->sk_zapped);
+                               sock_flag(sk, SOCK_DEAD),
+                               sock_flag(sk, SOCK_ZAPPED));
 
        switch (sk->sk_state) {
        case TCP_ESTABLISHED:
@@ -1056,8 +1092,8 @@ xprt_write_space(struct sock *sk)
 
        /* Wait until we have enough socket memory */
        if (xprt->stream) {
-               /* from net/ipv4/tcp.c:tcp_write_space */
-               if (tcp_wspace(sk) < tcp_min_write_space(sk))
+               /* from net/core/stream.c:sk_stream_write_space */
+               if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk))
                        goto out;
        } else {
                /* from net/core/sock.c:sock_def_write_space */
@@ -1069,7 +1105,7 @@ xprt_write_space(struct sock *sk)
                goto out;
 
        spin_lock_bh(&xprt->sock_lock);
-       if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)
+       if (xprt->snd_task)
                rpc_wake_up_task(xprt->snd_task);
        spin_unlock_bh(&xprt->sock_lock);
 out:
@@ -1166,6 +1202,7 @@ xprt_transmit(struct rpc_task *task)
                        /* Add request to the receive list */
                        list_add_tail(&req->rq_list, &xprt->recv);
                        spin_unlock_bh(&xprt->sock_lock);
+                       xprt_reset_majortimeo(req);
                }
        } else if (!req->rq_bytes_sent)
                return;
@@ -1221,7 +1258,7 @@ xprt_transmit(struct rpc_task *task)
                        if (!xprt_connected(xprt))
                                task->tk_status = -ENOTCONN;
                        else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
-                               task->tk_timeout = req->rq_timeout.to_current;
+                               task->tk_timeout = req->rq_timeout;
                                rpc_sleep_on(&xprt->pending, task, NULL, NULL);
                        }
                        spin_unlock_bh(&xprt->sock_lock);
@@ -1248,13 +1285,11 @@ xprt_transmit(struct rpc_task *task)
        if (!xprt->nocong) {
                int timer = task->tk_msg.rpc_proc->p_timer;
                task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer);
-               task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer);
-               task->tk_timeout <<= clnt->cl_timeout.to_retries
-                       - req->rq_timeout.to_retries;
-               if (task->tk_timeout > req->rq_timeout.to_maxval)
-                       task->tk_timeout = req->rq_timeout.to_maxval;
+               task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries;
+               if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0)
+                       task->tk_timeout = xprt->timeout.to_maxval;
        } else
-               task->tk_timeout = req->rq_timeout.to_current;
+               task->tk_timeout = req->rq_timeout;
        /* Don't race with disconnect */
        if (!xprt_connected(xprt))
                task->tk_status = -ENOTCONN;
@@ -1267,21 +1302,6 @@ xprt_transmit(struct rpc_task *task)
 /*
  * Reserve an RPC call slot.
  */
-void
-xprt_reserve(struct rpc_task *task)
-{
-       struct rpc_xprt *xprt = task->tk_xprt;
-
-       task->tk_status = -EIO;
-       if (!xprt->shutdown) {
-               spin_lock(&xprt->xprt_lock);
-               do_xprt_reserve(task);
-               spin_unlock(&xprt->xprt_lock);
-               if (task->tk_rqstp)
-                       del_timer_sync(&xprt->timer);
-       }
-}
-
 static inline void
 do_xprt_reserve(struct rpc_task *task)
 {
@@ -1303,6 +1323,21 @@ do_xprt_reserve(struct rpc_task *task)
        rpc_sleep_on(&xprt->backlog, task, NULL, NULL);
 }
 
+void
+xprt_reserve(struct rpc_task *task)
+{
+       struct rpc_xprt *xprt = task->tk_xprt;
+
+       task->tk_status = -EIO;
+       if (!xprt->shutdown) {
+               spin_lock(&xprt->xprt_lock);
+               do_xprt_reserve(task);
+               spin_unlock(&xprt->xprt_lock);
+               if (task->tk_rqstp)
+                       del_timer_sync(&xprt->timer);
+       }
+}
+
 /*
  * Allocate a 'unique' XID
  */
@@ -1324,12 +1359,12 @@ xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
 {
        struct rpc_rqst *req = task->tk_rqstp;
 
-       req->rq_timeout = xprt->timeout;
+       req->rq_timeout = xprt->timeout.to_initval;
        req->rq_task    = task;
        req->rq_xprt    = xprt;
        req->rq_xid     = xprt_alloc_xid(xprt);
        dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
-                       req, req->rq_xid);
+                       req, ntohl(req->rq_xid));
 }
 
 /*
@@ -1366,7 +1401,7 @@ xprt_release(struct rpc_task *task)
 /*
  * Set default timeout parameters
  */
-void
+static void
 xprt_default_timeout(struct rpc_timeout *to, int proto)
 {
        if (proto == IPPROTO_UDP)
@@ -1381,7 +1416,6 @@ xprt_default_timeout(struct rpc_timeout *to, int proto)
 void
 xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
 {
-       to->to_current   = 
        to->to_initval   = 
        to->to_increment = incr;
        to->to_maxval    = incr * retr;
@@ -1427,8 +1461,11 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
        if (xprt->stream) {
                xprt->cwnd = RPC_MAXCWND(xprt);
                xprt->nocong = 1;
-       } else
+               xprt->max_payload = (1U << 31) - 1;
+       } else {
                xprt->cwnd = RPC_INITCWND;
+               xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
+       }
        spin_lock_init(&xprt->sock_lock);
        spin_lock_init(&xprt->xprt_lock);
        init_waitqueue_head(&xprt->cong_wait);
@@ -1446,7 +1483,6 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
        /* Set timeout parameters */
        if (to) {
                xprt->timeout = *to;
-               xprt->timeout.to_current = to->to_initval;
        } else
                xprt_default_timeout(&xprt->timeout, xprt->prot);
 
@@ -1516,8 +1552,7 @@ xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock)
                sk->sk_no_check = UDP_CSUM_NORCV;
                xprt_set_connected(xprt);
        } else {
-               struct tcp_opt *tp = tcp_sk(sk);
-               tp->nonagle = 1;        /* disable Nagle's algorithm */
+               tcp_sk(sk)->nonagle = 1;        /* disable Nagle's algorithm */
                sk->sk_data_ready = tcp_data_ready;
                sk->sk_state_change = tcp_state_change;
                xprt_clear_connected(xprt);
@@ -1604,7 +1639,7 @@ xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to)
 /*
  * Prepare for transport shutdown.
  */
-void
+static void
 xprt_shutdown(struct rpc_xprt *xprt)
 {
        xprt->shutdown = 1;
@@ -1619,7 +1654,7 @@ xprt_shutdown(struct rpc_xprt *xprt)
 /*
  * Clear the xprt backlog queue
  */
-int
+static int
 xprt_clear_backlog(struct rpc_xprt *xprt) {
        rpc_wake_up_next(&xprt->backlog);
        wake_up(&xprt->cong_wait);