fedora core 6 1.2949 + vserver 2.2.0
[linux-2.6.git] / net / sunrpc / clnt.c
index d784797..f9fdcb3 100644 (file)
 #include <linux/types.h>
 #include <linux/mm.h>
 #include <linux/slab.h>
+#include <linux/smp_lock.h>
 #include <linux/utsname.h>
+#include <linux/workqueue.h>
+#include <linux/vs_cvirt.h>
 
 #include <linux/sunrpc/clnt.h>
-#include <linux/workqueue.h>
 #include <linux/sunrpc/rpc_pipe_fs.h>
-
-#include <linux/nfs.h>
+#include <linux/sunrpc/metrics.h>
 
 
 #define RPC_SLACK_SPACE                (1024)  /* total overkill */
@@ -61,8 +62,8 @@ static void   call_refreshresult(struct rpc_task *task);
 static void    call_timeout(struct rpc_task *task);
 static void    call_connect(struct rpc_task *task);
 static void    call_connect_status(struct rpc_task *task);
-static u32 *   call_header(struct rpc_task *task);
-static u32 *   call_verify(struct rpc_task *task);
+static __be32 *        call_header(struct rpc_task *task);
+static __be32 *        call_verify(struct rpc_task *task);
 
 
 static int
@@ -71,8 +72,15 @@ rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
        static uint32_t clntid;
        int error;
 
+       clnt->cl_vfsmnt = ERR_PTR(-ENOENT);
+       clnt->cl_dentry = ERR_PTR(-ENOENT);
        if (dir_name == NULL)
                return 0;
+
+       clnt->cl_vfsmnt = rpc_get_mount();
+       if (IS_ERR(clnt->cl_vfsmnt))
+               return PTR_ERR(clnt->cl_vfsmnt);
+
        for (;;) {
                snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
                                "%s/clnt%x", dir_name,
@@ -85,22 +93,13 @@ rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
                if (error != -EEXIST) {
                        printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
                                        clnt->cl_pathname, error);
+                       rpc_put_mount();
                        return error;
                }
        }
 }
 
-/*
- * Create an RPC client
- * FIXME: This should also take a flags argument (as in task->tk_flags).
- * It's called (among others) from pmap_create_client, which may in
- * turn be called by an async task. In this case, rpciod should not be
- * made to sleep too long.
- */
-struct rpc_clnt *
-rpc_new_client(struct rpc_xprt *xprt, char *servname,
-                 struct rpc_program *program, u32 vers,
-                 rpc_authflavor_t flavor)
+static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, struct rpc_program *program, u32 vers, rpc_authflavor_t flavor)
 {
        struct rpc_version      *version;
        struct rpc_clnt         *clnt = NULL;
@@ -118,10 +117,9 @@ rpc_new_client(struct rpc_xprt *xprt, char *servname,
                goto out_err;
 
        err = -ENOMEM;
-       clnt = kmalloc(sizeof(*clnt), GFP_KERNEL);
+       clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
        if (!clnt)
                goto out_err;
-       memset(clnt, 0, sizeof(*clnt));
        atomic_set(&clnt->cl_users, 0);
        atomic_set(&clnt->cl_count, 1);
        clnt->cl_parent = clnt;
@@ -141,15 +139,16 @@ rpc_new_client(struct rpc_xprt *xprt, char *servname,
        clnt->cl_procinfo = version->procs;
        clnt->cl_maxproc  = version->nrprocs;
        clnt->cl_protname = program->name;
-       clnt->cl_pmap     = &clnt->cl_pmap_default;
-       clnt->cl_port     = xprt->addr.sin_port;
        clnt->cl_prog     = program->number;
        clnt->cl_vers     = version->number;
-       clnt->cl_prot     = xprt->prot;
        clnt->cl_stats    = program->stats;
-       rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
+       clnt->cl_metrics  = rpc_alloc_iostats(clnt);
+       err = -ENOMEM;
+       if (clnt->cl_metrics == NULL)
+               goto out_no_stats;
+       clnt->cl_program  = program;
 
-       if (!clnt->cl_port)
+       if (!xprt_bound(clnt->cl_xprt))
                clnt->cl_autobind = 1;
 
        clnt->cl_rtt = &clnt->cl_rtt_default;
@@ -168,53 +167,91 @@ rpc_new_client(struct rpc_xprt *xprt, char *servname,
        }
 
        /* save the nodename */
-       clnt->cl_nodelen = strlen(system_utsname.nodename);
+       clnt->cl_nodelen = strlen(utsname()->nodename);
        if (clnt->cl_nodelen > UNX_MAXNODENAME)
                clnt->cl_nodelen = UNX_MAXNODENAME;
-       memcpy(clnt->cl_nodename, system_utsname.nodename, clnt->cl_nodelen);
+       memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
        return clnt;
 
 out_no_auth:
-       rpc_rmdir(clnt->cl_pathname);
+       if (!IS_ERR(clnt->cl_dentry)) {
+               rpc_rmdir(clnt->cl_dentry);
+               rpc_put_mount();
+       }
 out_no_path:
+       rpc_free_iostats(clnt->cl_metrics);
+out_no_stats:
        if (clnt->cl_server != clnt->cl_inline_name)
                kfree(clnt->cl_server);
        kfree(clnt);
 out_err:
-       xprt_destroy(xprt);
+       xprt_put(xprt);
 out_no_xprt:
        return ERR_PTR(err);
 }
 
-/**
- * Create an RPC client
- * @xprt - pointer to xprt struct
- * @servname - name of server
- * @info - rpc_program
- * @version - rpc_program version
- * @authflavor - rpc_auth flavour to use
+/*
+ * rpc_create - create an RPC client and transport with one call
+ * @args: rpc_clnt create argument structure
  *
- * Creates an RPC client structure, then pings the server in order to
- * determine if it is up, and if it supports this program and version.
+ * Creates and initializes an RPC transport and an RPC client.
  *
- * This function should never be called by asynchronous tasks such as
- * the portmapper.
+ * It can ping the server in order to determine if it is up, and to see if
+ * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
+ * this behavior so asynchronous tasks can also use rpc_create.
  */
-struct rpc_clnt *rpc_create_client(struct rpc_xprt *xprt, char *servname,
-               struct rpc_program *info, u32 version, rpc_authflavor_t authflavor)
+struct rpc_clnt *rpc_create(struct rpc_create_args *args)
 {
+       struct rpc_xprt *xprt;
        struct rpc_clnt *clnt;
-       int err;
-       
-       clnt = rpc_new_client(xprt, servname, info, version, authflavor);
+
+       xprt = xprt_create_transport(args->protocol, args->address,
+                                       args->addrsize, args->timeout);
+       if (IS_ERR(xprt))
+               return (struct rpc_clnt *)xprt;
+
+       /*
+        * By default, kernel RPC client connects from a reserved port.
+        * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
+        * but it is always enabled for rpciod, which handles the connect
+        * operation.
+        */
+       xprt->resvport = 1;
+       if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
+               xprt->resvport = 0;
+
+       dprintk("RPC:       creating %s client for %s (xprt %p)\n",
+               args->program->name, args->servername, xprt);
+
+       clnt = rpc_new_client(xprt, args->servername, args->program,
+                               args->version, args->authflavor);
        if (IS_ERR(clnt))
                return clnt;
-       err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
-       if (err == 0)
-               return clnt;
-       rpc_shutdown_client(clnt);
-       return ERR_PTR(err);
+
+       if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
+               int err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
+               if (err != 0) {
+                       rpc_shutdown_client(clnt);
+                       return ERR_PTR(err);
+               }
+       }
+
+       clnt->cl_softrtry = 1;
+       if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
+               clnt->cl_softrtry = 0;
+
+       if (args->flags & RPC_CLNT_CREATE_INTR)
+               clnt->cl_intr = 1;
+       if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
+               clnt->cl_autobind = 1;
+       if (args->flags & RPC_CLNT_CREATE_ONESHOT)
+               clnt->cl_oneshot = 1;
+       /* TODO: handle RPC_CLNT_CREATE_TAGGED
+       if (args->flags & RPC_CLNT_CREATE_TAGGED)
+               clnt->cl_tag = 1; */
+       return clnt;
 }
+EXPORT_SYMBOL_GPL(rpc_create);
 
 /*
  * This function clones the RPC client structure. It allows us to share the
@@ -225,17 +262,22 @@ struct rpc_clnt *
 rpc_clone_client(struct rpc_clnt *clnt)
 {
        struct rpc_clnt *new;
+       int err = -ENOMEM;
 
-       new = kmalloc(sizeof(*new), GFP_KERNEL);
+       new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
        if (!new)
                goto out_no_clnt;
-       memcpy(new, clnt, sizeof(*new));
        atomic_set(&new->cl_count, 1);
        atomic_set(&new->cl_users, 0);
+       new->cl_metrics = rpc_alloc_iostats(clnt);
+       if (new->cl_metrics == NULL)
+               goto out_no_stats;
+       err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
+       if (err != 0)
+               goto out_no_path;
        new->cl_parent = clnt;
        atomic_inc(&clnt->cl_count);
-       /* Duplicate portmapper */
-       rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
+       new->cl_xprt = xprt_get(clnt->cl_xprt);
        /* Turn off autobind on clones */
        new->cl_autobind = 0;
        new->cl_oneshot = 0;
@@ -243,12 +285,14 @@ rpc_clone_client(struct rpc_clnt *clnt)
        rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
        if (new->cl_auth)
                atomic_inc(&new->cl_auth->au_count);
-       new->cl_pmap            = &new->cl_pmap_default;
-       rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
        return new;
+out_no_path:
+       rpc_free_iostats(new->cl_metrics);
+out_no_stats:
+       kfree(new);
 out_no_clnt:
-       printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
-       return ERR_PTR(-ENOMEM);
+       dprintk("RPC: %s returned error %d\n", __FUNCTION__, err);
+       return ERR_PTR(err);
 }
 
 /*
@@ -301,19 +345,20 @@ rpc_destroy_client(struct rpc_clnt *clnt)
                rpcauth_destroy(clnt->cl_auth);
                clnt->cl_auth = NULL;
        }
+       if (!IS_ERR(clnt->cl_dentry)) {
+               rpc_rmdir(clnt->cl_dentry);
+               rpc_put_mount();
+       }
        if (clnt->cl_parent != clnt) {
                rpc_destroy_client(clnt->cl_parent);
                goto out_free;
        }
-       if (clnt->cl_pathname[0])
-               rpc_rmdir(clnt->cl_pathname);
-       if (clnt->cl_xprt) {
-               xprt_destroy(clnt->cl_xprt);
-               clnt->cl_xprt = NULL;
-       }
        if (clnt->cl_server != clnt->cl_inline_name)
                kfree(clnt->cl_server);
 out_free:
+       rpc_free_iostats(clnt->cl_metrics);
+       clnt->cl_metrics = NULL;
+       xprt_put(clnt->cl_xprt);
        kfree(clnt);
        return 0;
 }
@@ -437,10 +482,9 @@ int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
 
        BUG_ON(flags & RPC_TASK_ASYNC);
 
-       status = -ENOMEM;
        task = rpc_new_task(clnt, flags, &rpc_default_ops, NULL);
        if (task == NULL)
-               goto out;
+               return -ENOMEM;
 
        /* Mask signals on RPC calls _and_ GSS_AUTH upcalls */
        rpc_task_sigmask(task, &oldset);
@@ -449,15 +493,15 @@ int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
 
        /* Set up the call info struct and execute the task */
        status = task->tk_status;
-       if (status == 0) {
-               atomic_inc(&task->tk_count);
-               status = rpc_execute(task);
-               if (status == 0)
-                       status = task->tk_status;
-       }
-       rpc_restore_sigmask(&oldset);
-       rpc_release_task(task);
+       if (status != 0)
+               goto out;
+       atomic_inc(&task->tk_count);
+       status = rpc_execute(task);
+       if (status == 0)
+               status = task->tk_status;
 out:
+       rpc_put_task(task);
+       rpc_restore_sigmask(&oldset);
        return status;
 }
 
@@ -473,15 +517,16 @@ rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
        int             status;
 
        /* If this client is slain all further I/O fails */
+       status = -EIO;
        if (clnt->cl_dead) 
-               return -EIO;
+               goto out_release;
 
        flags |= RPC_TASK_ASYNC;
 
        /* Create/initialize a new RPC task */
        status = -ENOMEM;
        if (!(task = rpc_new_task(clnt, flags, tk_ops, data)))
-               goto out;
+               goto out_release;
 
        /* Mask signals on GSS_AUTH upcalls */
        rpc_task_sigmask(task, &oldset);                
@@ -493,10 +538,12 @@ rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
        if (status == 0)
                rpc_execute(task);
        else
-               rpc_release_task(task);
+               rpc_put_task(task);
 
        rpc_restore_sigmask(&oldset);           
-out:
+       return status;
+out_release:
+       rpc_release_calldata(tk_ops, data);
        return status;
 }
 
@@ -518,6 +565,44 @@ rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
                task->tk_action = rpc_exit_task;
 }
 
+/**
+ * rpc_peeraddr - extract remote peer address from clnt's xprt
+ * @clnt: RPC client structure
+ * @buf: target buffer
+ * @size: length of target buffer
+ *
+ * Returns the number of bytes that are actually in the stored address.
+ */
+size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
+{
+       size_t bytes;
+       struct rpc_xprt *xprt = clnt->cl_xprt;
+
+       bytes = sizeof(xprt->addr);
+       if (bytes > bufsize)
+               bytes = bufsize;
+       memcpy(buf, &clnt->cl_xprt->addr, bytes);
+       return xprt->addrlen;
+}
+EXPORT_SYMBOL_GPL(rpc_peeraddr);
+
+/**
+ * rpc_peeraddr2str - return remote peer address in printable format
+ * @clnt: RPC client structure
+ * @format: address format
+ *
+ */
+char *rpc_peeraddr2str(struct rpc_clnt *clnt, enum rpc_display_format_t format)
+{
+       struct rpc_xprt *xprt = clnt->cl_xprt;
+
+       if (xprt->address_strings[format] != NULL)
+               return xprt->address_strings[format];
+       else
+               return "unprintable";
+}
+EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
+
 void
 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
 {
@@ -538,7 +623,7 @@ size_t rpc_max_payload(struct rpc_clnt *clnt)
 {
        return clnt->cl_xprt->max_payload;
 }
-EXPORT_SYMBOL(rpc_max_payload);
+EXPORT_SYMBOL_GPL(rpc_max_payload);
 
 /**
  * rpc_force_rebind - force transport to check that remote port is unchanged
@@ -548,9 +633,9 @@ EXPORT_SYMBOL(rpc_max_payload);
 void rpc_force_rebind(struct rpc_clnt *clnt)
 {
        if (clnt->cl_autobind)
-               clnt->cl_port = 0;
+               xprt_clear_bound(clnt->cl_xprt);
 }
-EXPORT_SYMBOL(rpc_force_rebind);
+EXPORT_SYMBOL_GPL(rpc_force_rebind);
 
 /*
  * Restart an (async) RPC call. Usually called from within the
@@ -714,7 +799,7 @@ call_encode(struct rpc_task *task)
        struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
        unsigned int    bufsiz;
        kxdrproc_t      encode;
-       u32             *p;
+       __be32          *p;
 
        dprintk("RPC: %4d call_encode (status %d)\n", 
                                task->tk_pid, task->tk_status);
@@ -744,8 +829,10 @@ call_encode(struct rpc_task *task)
        if (encode == NULL)
                return;
 
+       lock_kernel();
        task->tk_status = rpcauth_wrap_req(task, encode, req, p,
                        task->tk_msg.rpc_argp);
+       unlock_kernel();
        if (task->tk_status == -ENOMEM) {
                /* XXX: Is this sane? */
                rpc_delay(task, 3*HZ);
@@ -759,16 +846,16 @@ call_encode(struct rpc_task *task)
 static void
 call_bind(struct rpc_task *task)
 {
-       struct rpc_clnt *clnt = task->tk_client;
+       struct rpc_xprt *xprt = task->tk_xprt;
 
        dprintk("RPC: %4d call_bind (status %d)\n",
                                task->tk_pid, task->tk_status);
 
        task->tk_action = call_connect;
-       if (!clnt->cl_port) {
+       if (!xprt_bound(xprt)) {
                task->tk_action = call_bind_status;
-               task->tk_timeout = task->tk_xprt->bind_timeout;
-               rpc_getport(task, clnt);
+               task->tk_timeout = xprt->bind_timeout;
+               xprt->ops->rpcbind(task);
        }
 }
 
@@ -793,15 +880,11 @@ call_bind_status(struct rpc_task *task)
                dprintk("RPC: %4d remote rpcbind: RPC program/version unavailable\n",
                                task->tk_pid);
                rpc_delay(task, 3*HZ);
-               goto retry_bind;
+               goto retry_timeout;
        case -ETIMEDOUT:
                dprintk("RPC: %4d rpcbind request timed out\n",
                                task->tk_pid);
-               if (RPC_IS_SOFT(task)) {
-                       status = -EIO;
-                       break;
-               }
-               goto retry_bind;
+               goto retry_timeout;
        case -EPFNOSUPPORT:
                dprintk("RPC: %4d remote rpcbind service unavailable\n",
                                task->tk_pid);
@@ -814,16 +897,13 @@ call_bind_status(struct rpc_task *task)
                dprintk("RPC: %4d unrecognized rpcbind error (%d)\n",
                                task->tk_pid, -task->tk_status);
                status = -EIO;
-               break;
        }
 
        rpc_exit(task, status);
        return;
 
-retry_bind:
-       task->tk_status = 0;
-       task->tk_action = call_bind;
-       return;
+retry_timeout:
+       task->tk_action = call_timeout;
 }
 
 /*
@@ -871,14 +951,16 @@ call_connect_status(struct rpc_task *task)
 
        switch (status) {
        case -ENOTCONN:
-       case -ETIMEDOUT:
        case -EAGAIN:
                task->tk_action = call_bind;
-               break;
-       default:
-               rpc_exit(task, -EIO);
-               break;
+               if (!RPC_IS_SOFT(task))
+                       return;
+               /* if soft mounted, test if we've timed out */
+       case -ETIMEDOUT:
+               task->tk_action = call_timeout;
+               return;
        }
+       rpc_exit(task, -EIO);
 }
 
 /*
@@ -896,26 +978,43 @@ call_transmit(struct rpc_task *task)
        task->tk_status = xprt_prepare_transmit(task);
        if (task->tk_status != 0)
                return;
+       task->tk_action = call_transmit_status;
        /* Encode here so that rpcsec_gss can use correct sequence number. */
        if (rpc_task_need_encode(task)) {
-               task->tk_rqstp->rq_bytes_sent = 0;
+               BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
                call_encode(task);
                /* Did the encode result in an error condition? */
                if (task->tk_status != 0)
-                       goto out_nosend;
+                       return;
        }
-       task->tk_action = call_transmit_status;
        xprt_transmit(task);
        if (task->tk_status < 0)
                return;
-       if (!task->tk_msg.rpc_proc->p_decode) {
-               task->tk_action = rpc_exit_task;
-               rpc_wake_up_task(task);
-       }
-       return;
-out_nosend:
-       /* release socket write lock before attempting to handle error */
-       xprt_abort_transmit(task);
+       /*
+        * On success, ensure that we call xprt_end_transmit() before sleeping
+        * in order to allow access to the socket to other RPC requests.
+        */
+       call_transmit_status(task);
+       if (task->tk_msg.rpc_proc->p_decode != NULL)
+               return;
+       task->tk_action = rpc_exit_task;
+       rpc_wake_up_task(task);
+}
+
+/*
+ * 5a. Handle cleanup after a transmission
+ */
+static void
+call_transmit_status(struct rpc_task *task)
+{
+       task->tk_action = call_status;
+       /*
+        * Special case: if we've been waiting on the socket's write_space()
+        * callback, then don't call xprt_end_transmit().
+        */
+       if (task->tk_status == -EAGAIN)
+               return;
+       xprt_end_transmit(task);
        rpc_task_force_reencode(task);
 }
 
@@ -943,6 +1042,14 @@ call_status(struct rpc_task *task)
 
        task->tk_status = 0;
        switch(status) {
+       case -EHOSTDOWN:
+       case -EHOSTUNREACH:
+       case -ENETUNREACH:
+               /*
+                * Delay any retries for 3 seconds, then handle as if it
+                * were a timeout.
+                */
+               rpc_delay(task, 3*HZ);
        case -ETIMEDOUT:
                task->tk_action = call_timeout;
                break;
@@ -962,23 +1069,11 @@ call_status(struct rpc_task *task)
                printk("%s: RPC call returned error %d\n",
                               clnt->cl_protname, -status);
                rpc_exit(task, status);
-               break;
        }
 }
 
 /*
- * 6a. Handle transmission errors.
- */
-static void
-call_transmit_status(struct rpc_task *task)
-{
-       if (task->tk_status != -EAGAIN)
-               rpc_task_force_reencode(task);
-       call_status(task);
-}
-
-/*
- * 6b. Handle RPC timeout
+ * 6a. Handle RPC timeout
  *     We do not release the request slot, so we keep using the
  *     same XID for all retransmits.
  */
@@ -993,6 +1088,8 @@ call_timeout(struct rpc_task *task)
        }
 
        dprintk("RPC: %4d call_timeout (major)\n", task->tk_pid);
+       task->tk_timeouts++;
+
        if (RPC_IS_SOFT(task)) {
                printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
                                clnt->cl_protname, clnt->cl_server);
@@ -1022,7 +1119,7 @@ call_decode(struct rpc_task *task)
        struct rpc_clnt *clnt = task->tk_client;
        struct rpc_rqst *req = task->tk_rqstp;
        kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
-       u32             *p;
+       __be32          *p;
 
        dprintk("RPC: %4d call_decode (status %d)\n", 
                                task->tk_pid, task->tk_status);
@@ -1039,12 +1136,17 @@ call_decode(struct rpc_task *task)
                        clnt->cl_stats->rpcretrans++;
                        goto out_retry;
                }
-               printk(KERN_WARNING "%s: too small RPC reply size (%d bytes)\n",
+               dprintk("%s: too small RPC reply size (%d bytes)\n",
                        clnt->cl_protname, task->tk_status);
-               rpc_exit(task, -EIO);
-               return;
+               task->tk_action = call_timeout;
+               goto out_retry;
        }
 
+       /*
+        * Ensure that we see all writes made by xprt_complete_rqst()
+        * before it changed req->rq_received.
+        */
+       smp_rmb();
        req->rq_rcv_buf.len = req->rq_private_buf.len;
 
        /* Check that the softirq receive buffer is valid */
@@ -1061,9 +1163,12 @@ call_decode(struct rpc_task *task)
 
        task->tk_action = rpc_exit_task;
 
-       if (decode)
+       if (decode) {
+               lock_kernel();
                task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
                                                      task->tk_msg.rpc_resp);
+               unlock_kernel();
+       }
        dprintk("RPC: %4d call_decode result %d\n", task->tk_pid,
                                        task->tk_status);
        return;
@@ -1114,12 +1219,12 @@ call_refreshresult(struct rpc_task *task)
 /*
  * Call header serialization
  */
-static u32 *
+static __be32 *
 call_header(struct rpc_task *task)
 {
        struct rpc_clnt *clnt = task->tk_client;
        struct rpc_rqst *req = task->tk_rqstp;
-       u32             *p = req->rq_svec[0].iov_base;
+       __be32          *p = req->rq_svec[0].iov_base;
 
        /* FIXME: check buffer size? */
 
@@ -1138,14 +1243,26 @@ call_header(struct rpc_task *task)
 /*
  * Reply header verification
  */
-static u32 *
+static __be32 *
 call_verify(struct rpc_task *task)
 {
        struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
        int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
-       u32     *p = iov->iov_base, n;
+       __be32  *p = iov->iov_base;
+       u32 n;
        int error = -EACCES;
 
+       if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
+               /* RFC-1014 says that the representation of XDR data must be a
+                * multiple of four bytes
+                * - if it isn't pointer subtraction in the NFS client may give
+                *   undefined results
+                */
+               printk(KERN_WARNING
+                      "call_verify: XDR representation not a multiple of"
+                      " 4 bytes: 0x%x\n", task->tk_rqstp->rq_rcv_buf.len);
+               goto out_eio;
+       }
        if ((len -= 3) < 0)
                goto out_overflow;
        p += 1; /* skip XID */
@@ -1194,8 +1311,8 @@ call_verify(struct rpc_task *task)
                        task->tk_action = call_bind;
                        goto out_retry;
                case RPC_AUTH_TOOWEAK:
-                       printk(KERN_NOTICE "call_verify: server requires stronger "
-                              "authentication.\n");
+                       printk(KERN_NOTICE "call_verify: server %s requires stronger "
+                              "authentication.\n", task->tk_client->cl_server);
                        break;
                default:
                        printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
@@ -1209,7 +1326,7 @@ call_verify(struct rpc_task *task)
                printk(KERN_WARNING "call_verify: auth check failed\n");
                goto out_garbage;               /* bad verifier, retry */
        }
-       len = p - (u32 *)iov->iov_base - 1;
+       len = p - (__be32 *)iov->iov_base - 1;
        if (len < 0)
                goto out_overflow;
        switch ((n = ntohl(*p++))) {
@@ -1264,12 +1381,12 @@ out_overflow:
        goto out_garbage;
 }
 
-static int rpcproc_encode_null(void *rqstp, u32 *data, void *obj)
+static int rpcproc_encode_null(void *rqstp, __be32 *data, void *obj)
 {
        return 0;
 }
 
-static int rpcproc_decode_null(void *rqstp, u32 *data, void *obj)
+static int rpcproc_decode_null(void *rqstp, __be32 *data, void *obj)
 {
        return 0;
 }