Fedora kernel-2.6.17-1.2142_FC4 patched with stable patch-2.6.17.4-vs2.0.2-rc26.diff
[linux-2.6.git] / net / sunrpc / clnt.c
index 02bc029..aa8965e 100644 (file)
@@ -1,5 +1,5 @@
 /*
- *  linux/net/sunrpc/rpcclnt.c
+ *  linux/net/sunrpc/clnt.c
  *
  *  This file contains the high-level RPC interface.
  *  It is modeled as a finite state machine to support both synchronous
 #include <linux/types.h>
 #include <linux/mm.h>
 #include <linux/slab.h>
-#include <linux/in.h>
 #include <linux/utsname.h>
+#include <linux/workqueue.h>
 
 #include <linux/sunrpc/clnt.h>
-#include <linux/workqueue.h>
 #include <linux/sunrpc/rpc_pipe_fs.h>
-
-#include <linux/nfs.h>
+#include <linux/sunrpc/metrics.h>
 
 
 #define RPC_SLACK_SPACE                (1024)  /* total overkill */
@@ -53,8 +51,10 @@ static void  call_allocate(struct rpc_task *task);
 static void    call_encode(struct rpc_task *task);
 static void    call_decode(struct rpc_task *task);
 static void    call_bind(struct rpc_task *task);
+static void    call_bind_status(struct rpc_task *task);
 static void    call_transmit(struct rpc_task *task);
 static void    call_status(struct rpc_task *task);
+static void    call_transmit_status(struct rpc_task *task);
 static void    call_refresh(struct rpc_task *task);
 static void    call_refreshresult(struct rpc_task *task);
 static void    call_timeout(struct rpc_task *task);
@@ -70,8 +70,15 @@ rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
        static uint32_t clntid;
        int error;
 
+       clnt->cl_vfsmnt = ERR_PTR(-ENOENT);
+       clnt->cl_dentry = ERR_PTR(-ENOENT);
        if (dir_name == NULL)
                return 0;
+
+       clnt->cl_vfsmnt = rpc_get_mount();
+       if (IS_ERR(clnt->cl_vfsmnt))
+               return PTR_ERR(clnt->cl_vfsmnt);
+
        for (;;) {
                snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
                                "%s/clnt%x", dir_name,
@@ -84,6 +91,7 @@ rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
                if (error != -EEXIST) {
                        printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
                                        clnt->cl_pathname, error);
+                       rpc_put_mount();
                        return error;
                }
        }
@@ -97,12 +105,13 @@ rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
  * made to sleep too long.
  */
 struct rpc_clnt *
-rpc_create_client(struct rpc_xprt *xprt, char *servname,
+rpc_new_client(struct rpc_xprt *xprt, char *servname,
                  struct rpc_program *program, u32 vers,
                  rpc_authflavor_t flavor)
 {
        struct rpc_version      *version;
        struct rpc_clnt         *clnt = NULL;
+       struct rpc_auth         *auth;
        int err;
        int len;
 
@@ -111,12 +120,12 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname,
 
        err = -EINVAL;
        if (!xprt)
-               goto out_err;
+               goto out_no_xprt;
        if (vers >= program->nrvers || !(version = program->version[vers]))
                goto out_err;
 
        err = -ENOMEM;
-       clnt = (struct rpc_clnt *) kmalloc(sizeof(*clnt), GFP_KERNEL);
+       clnt = kmalloc(sizeof(*clnt), GFP_KERNEL);
        if (!clnt)
                goto out_err;
        memset(clnt, 0, sizeof(*clnt));
@@ -145,6 +154,7 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname,
        clnt->cl_vers     = version->number;
        clnt->cl_prot     = xprt->prot;
        clnt->cl_stats    = program->stats;
+       clnt->cl_metrics  = rpc_alloc_iostats(clnt);
        rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
 
        if (!clnt->cl_port)
@@ -157,10 +167,11 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname,
        if (err < 0)
                goto out_no_path;
 
-       err = -ENOMEM;
-       if (!rpcauth_create(flavor, clnt)) {
+       auth = rpcauth_create(flavor, clnt);
+       if (IS_ERR(auth)) {
                printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
                                flavor);
+               err = PTR_ERR(auth);
                goto out_no_auth;
        }
 
@@ -172,12 +183,48 @@ rpc_create_client(struct rpc_xprt *xprt, char *servname,
        return clnt;
 
 out_no_auth:
-       rpc_rmdir(clnt->cl_pathname);
+       if (!IS_ERR(clnt->cl_dentry)) {
+               rpc_rmdir(clnt->cl_pathname);
+               dput(clnt->cl_dentry);
+               rpc_put_mount();
+       }
 out_no_path:
        if (clnt->cl_server != clnt->cl_inline_name)
                kfree(clnt->cl_server);
        kfree(clnt);
 out_err:
+       xprt_destroy(xprt);
+out_no_xprt:
+       return ERR_PTR(err);
+}
+
+/**
+ * Create an RPC client
+ * @xprt - pointer to xprt struct
+ * @servname - name of server
+ * @info - rpc_program
+ * @version - rpc_program version
+ * @authflavor - rpc_auth flavour to use
+ *
+ * Creates an RPC client structure, then pings the server in order to
+ * determine if it is up, and if it supports this program and version.
+ *
+ * This function should never be called by asynchronous tasks such as
+ * the portmapper.
+ */
+struct rpc_clnt *rpc_create_client(struct rpc_xprt *xprt, char *servname,
+               struct rpc_program *info, u32 version, rpc_authflavor_t authflavor)
+{
+       struct rpc_clnt *clnt;
+       int err;
+       
+       clnt = rpc_new_client(xprt, servname, info, version, authflavor);
+       if (IS_ERR(clnt))
+               return clnt;
+       err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
+       if (err == 0)
+               return clnt;
+       rpc_shutdown_client(clnt);
        return ERR_PTR(err);
 }
 
@@ -191,7 +238,7 @@ rpc_clone_client(struct rpc_clnt *clnt)
 {
        struct rpc_clnt *new;
 
-       new = (struct rpc_clnt *)kmalloc(sizeof(*new), GFP_KERNEL);
+       new = kmalloc(sizeof(*new), GFP_KERNEL);
        if (!new)
                goto out_no_clnt;
        memcpy(new, clnt, sizeof(*new));
@@ -205,9 +252,15 @@ rpc_clone_client(struct rpc_clnt *clnt)
        new->cl_autobind = 0;
        new->cl_oneshot = 0;
        new->cl_dead = 0;
+       if (!IS_ERR(new->cl_dentry)) {
+               dget(new->cl_dentry);
+               rpc_get_mount();
+       }
        rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
        if (new->cl_auth)
                atomic_inc(&new->cl_auth->au_count);
+       new->cl_pmap            = &new->cl_pmap_default;
+       new->cl_metrics         = rpc_alloc_iostats(clnt);
        return new;
 out_no_clnt:
        printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
@@ -232,7 +285,8 @@ rpc_shutdown_client(struct rpc_clnt *clnt)
                clnt->cl_oneshot = 0;
                clnt->cl_dead = 0;
                rpc_killall_tasks(clnt);
-               sleep_on_timeout(&destroy_wait, 1*HZ);
+               wait_event_timeout(destroy_wait,
+                       !atomic_read(&clnt->cl_users), 1*HZ);
        }
 
        if (atomic_read(&clnt->cl_users) < 0) {
@@ -276,6 +330,12 @@ rpc_destroy_client(struct rpc_clnt *clnt)
        if (clnt->cl_server != clnt->cl_inline_name)
                kfree(clnt->cl_server);
 out_free:
+       rpc_free_iostats(clnt->cl_metrics);
+       clnt->cl_metrics = NULL;
+       if (!IS_ERR(clnt->cl_dentry)) {
+               dput(clnt->cl_dentry);
+               rpc_put_mount();
+       }
        kfree(clnt);
        return 0;
 }
@@ -296,47 +356,92 @@ rpc_release_client(struct rpc_clnt *clnt)
                rpc_destroy_client(clnt);
 }
 
+/**
+ * rpc_bind_new_program - bind a new RPC program to an existing client
+ * @old - old rpc_client
+ * @program - rpc program to set
+ * @vers - rpc program version
+ *
+ * Clones the rpc client and sets up a new RPC program. This is mainly
+ * of use for enabling different RPC programs to share the same transport.
+ * The Sun NFSv2/v3 ACL protocol can do this.
+ */
+struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
+                                     struct rpc_program *program,
+                                     int vers)
+{
+       struct rpc_clnt *clnt;
+       struct rpc_version *version;
+       int err;
+
+       BUG_ON(vers >= program->nrvers || !program->version[vers]);
+       version = program->version[vers];
+       clnt = rpc_clone_client(old);
+       if (IS_ERR(clnt))
+               goto out;
+       clnt->cl_procinfo = version->procs;
+       clnt->cl_maxproc  = version->nrprocs;
+       clnt->cl_protname = program->name;
+       clnt->cl_prog     = program->number;
+       clnt->cl_vers     = version->number;
+       clnt->cl_stats    = program->stats;
+       err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
+       if (err != 0) {
+               rpc_shutdown_client(clnt);
+               clnt = ERR_PTR(err);
+       }
+out:   
+       return clnt;
+}
+
 /*
  * Default callback for async RPC calls
  */
 static void
-rpc_default_callback(struct rpc_task *task)
+rpc_default_callback(struct rpc_task *task, void *data)
 {
 }
 
+static const struct rpc_call_ops rpc_default_ops = {
+       .rpc_call_done = rpc_default_callback,
+};
+
 /*
- *     Export the signal mask handling for aysnchronous code that
+ *     Export the signal mask handling for synchronous code that
  *     sleeps on RPC calls
  */
+#define RPC_INTR_SIGNALS (sigmask(SIGHUP) | sigmask(SIGINT) | sigmask(SIGQUIT) | sigmask(SIGTERM))
  
-void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
+static void rpc_save_sigmask(sigset_t *oldset, int intr)
 {
        unsigned long   sigallow = sigmask(SIGKILL);
-       unsigned long   irqflags;
-       
-       /* Turn off various signals */
-       if (clnt->cl_intr) {
-               struct k_sigaction *action = current->sighand->action;
-               if (action[SIGINT-1].sa.sa_handler == SIG_DFL)
-                       sigallow |= sigmask(SIGINT);
-               if (action[SIGQUIT-1].sa.sa_handler == SIG_DFL)
-                       sigallow |= sigmask(SIGQUIT);
-       }
-       spin_lock_irqsave(&current->sighand->siglock, irqflags);
-       *oldset = current->blocked;
-       siginitsetinv(&current->blocked, sigallow & ~oldset->sig[0]);
-       recalc_sigpending();
-       spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
+       sigset_t sigmask;
+
+       /* Block all signals except those listed in sigallow */
+       if (intr)
+               sigallow |= RPC_INTR_SIGNALS;
+       siginitsetinv(&sigmask, sigallow);
+       sigprocmask(SIG_BLOCK, &sigmask, oldset);
+}
+
+static inline void rpc_task_sigmask(struct rpc_task *task, sigset_t *oldset)
+{
+       rpc_save_sigmask(oldset, !RPC_TASK_UNINTERRUPTIBLE(task));
+}
+
+static inline void rpc_restore_sigmask(sigset_t *oldset)
+{
+       sigprocmask(SIG_SETMASK, oldset, NULL);
+}
+
+void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
+{
+       rpc_save_sigmask(oldset, clnt->cl_intr);
 }
 
 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
 {
-       unsigned long   irqflags;
-       
-       spin_lock_irqsave(&current->sighand->siglock, irqflags);
-       current->blocked = *oldset;
-       recalc_sigpending();
-       spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
+       rpc_restore_sigmask(oldset);
 }
 
 /*
@@ -354,26 +459,27 @@ int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
 
        BUG_ON(flags & RPC_TASK_ASYNC);
 
-       rpc_clnt_sigmask(clnt, &oldset);                
-
        status = -ENOMEM;
-       task = rpc_new_task(clnt, NULL, flags);
+       task = rpc_new_task(clnt, flags, &rpc_default_ops, NULL);
        if (task == NULL)
                goto out;
 
+       /* Mask signals on RPC calls _and_ GSS_AUTH upcalls */
+       rpc_task_sigmask(task, &oldset);
+
        rpc_call_setup(task, msg, 0);
 
        /* Set up the call info struct and execute the task */
-       if (task->tk_status == 0)
+       status = task->tk_status;
+       if (status == 0) {
+               atomic_inc(&task->tk_count);
                status = rpc_execute(task);
-       else {
-               status = task->tk_status;
-               rpc_release_task(task);
+               if (status == 0)
+                       status = task->tk_status;
        }
-
+       rpc_restore_sigmask(&oldset);
+       rpc_release_task(task);
 out:
-       rpc_clnt_sigunmask(clnt, &oldset);              
-
        return status;
 }
 
@@ -382,27 +488,26 @@ out:
  */
 int
 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
-              rpc_action callback, void *data)
+              const struct rpc_call_ops *tk_ops, void *data)
 {
        struct rpc_task *task;
        sigset_t        oldset;
        int             status;
 
        /* If this client is slain all further I/O fails */
+       status = -EIO;
        if (clnt->cl_dead) 
-               return -EIO;
+               goto out_release;
 
        flags |= RPC_TASK_ASYNC;
 
-       rpc_clnt_sigmask(clnt, &oldset);                
-
        /* Create/initialize a new RPC task */
-       if (!callback)
-               callback = rpc_default_callback;
        status = -ENOMEM;
-       if (!(task = rpc_new_task(clnt, callback, flags)))
-               goto out;
-       task->tk_calldata = data;
+       if (!(task = rpc_new_task(clnt, flags, tk_ops, data)))
+               goto out_release;
+
+       /* Mask signals on GSS_AUTH upcalls */
+       rpc_task_sigmask(task, &oldset);                
 
        rpc_call_setup(task, msg, 0);
 
@@ -413,9 +518,11 @@ rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
        else
                rpc_release_task(task);
 
-out:
-       rpc_clnt_sigunmask(clnt, &oldset);              
-
+       rpc_restore_sigmask(&oldset);           
+       return status;
+out_release:
+       if (tk_ops->rpc_release != NULL)
+               tk_ops->rpc_release(data);
        return status;
 }
 
@@ -434,22 +541,15 @@ rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
        if (task->tk_status == 0)
                task->tk_action = call_start;
        else
-               task->tk_action = NULL;
+               task->tk_action = rpc_exit_task;
 }
 
 void
 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
 {
        struct rpc_xprt *xprt = clnt->cl_xprt;
-
-       xprt->sndsize = 0;
-       if (sndsize)
-               xprt->sndsize = sndsize + RPC_SLACK_SPACE;
-       xprt->rcvsize = 0;
-       if (rcvsize)
-               xprt->rcvsize = rcvsize + RPC_SLACK_SPACE;
-       if (xprt_connected(xprt))
-               xprt_sock_setbufsize(xprt);
+       if (xprt->ops->set_buffer_size)
+               xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
 }
 
 /*
@@ -466,6 +566,18 @@ size_t rpc_max_payload(struct rpc_clnt *clnt)
 }
 EXPORT_SYMBOL(rpc_max_payload);
 
+/**
+ * rpc_force_rebind - force transport to check that remote port is unchanged
+ * @clnt: client to rebind
+ *
+ */
+void rpc_force_rebind(struct rpc_clnt *clnt)
+{
+       if (clnt->cl_autobind)
+               clnt->cl_port = 0;
+}
+EXPORT_SYMBOL(rpc_force_rebind);
+
 /*
  * Restart an (async) RPC call. Usually called from within the
  * exit handler.
@@ -572,28 +684,30 @@ call_reserveresult(struct rpc_task *task)
 
 /*
  * 2.  Allocate the buffer. For details, see sched.c:rpc_malloc.
- *     (Note: buffer memory is freed in rpc_task_release).
+ *     (Note: buffer memory is freed in xprt_release).
  */
 static void
 call_allocate(struct rpc_task *task)
 {
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = task->tk_xprt;
        unsigned int    bufsiz;
 
        dprintk("RPC: %4d call_allocate (status %d)\n", 
                                task->tk_pid, task->tk_status);
        task->tk_action = call_bind;
-       if (task->tk_buffer)
+       if (req->rq_buffer)
                return;
 
        /* FIXME: compute buffer requirements more exactly using
         * auth->au_wslack */
        bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
 
-       if (rpc_malloc(task, bufsiz << 1) != NULL)
+       if (xprt->ops->buf_alloc(task, bufsiz << 1) != NULL)
                return;
        printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); 
 
-       if (RPC_IS_ASYNC(task) || !(task->tk_client->cl_intr && signalled())) {
+       if (RPC_IS_ASYNC(task) || !signalled()) {
                xprt_release(task);
                task->tk_action = call_reserve;
                rpc_delay(task, HZ>>4);
@@ -603,33 +717,43 @@ call_allocate(struct rpc_task *task)
        rpc_exit(task, -ERESTARTSYS);
 }
 
+static inline int
+rpc_task_need_encode(struct rpc_task *task)
+{
+       return task->tk_rqstp->rq_snd_buf.len == 0;
+}
+
+static inline void
+rpc_task_force_reencode(struct rpc_task *task)
+{
+       task->tk_rqstp->rq_snd_buf.len = 0;
+}
+
 /*
  * 3.  Encode arguments of an RPC call
  */
 static void
 call_encode(struct rpc_task *task)
 {
-       struct rpc_clnt *clnt = task->tk_client;
        struct rpc_rqst *req = task->tk_rqstp;
        struct xdr_buf *sndbuf = &req->rq_snd_buf;
        struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
        unsigned int    bufsiz;
        kxdrproc_t      encode;
-       int             status;
        u32             *p;
 
        dprintk("RPC: %4d call_encode (status %d)\n", 
                                task->tk_pid, task->tk_status);
 
        /* Default buffer setup */
-       bufsiz = task->tk_bufsize >> 1;
-       sndbuf->head[0].iov_base = (void *)task->tk_buffer;
+       bufsiz = req->rq_bufsize >> 1;
+       sndbuf->head[0].iov_base = (void *)req->rq_buffer;
        sndbuf->head[0].iov_len  = bufsiz;
        sndbuf->tail[0].iov_len  = 0;
        sndbuf->page_len         = 0;
        sndbuf->len              = 0;
        sndbuf->buflen           = bufsiz;
-       rcvbuf->head[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz);
+       rcvbuf->head[0].iov_base = (void *)((char *)req->rq_buffer + bufsiz);
        rcvbuf->head[0].iov_len  = bufsiz;
        rcvbuf->tail[0].iov_len  = 0;
        rcvbuf->page_len         = 0;
@@ -643,11 +767,15 @@ call_encode(struct rpc_task *task)
                rpc_exit(task, -EIO);
                return;
        }
-       if (encode && (status = rpcauth_wrap_req(task, encode, req, p,
-                                                task->tk_msg.rpc_argp)) < 0) {
-               printk(KERN_WARNING "%s: can't encode arguments: %d\n",
-                               clnt->cl_protname, -status);
-               rpc_exit(task, status);
+       if (encode == NULL)
+               return;
+
+       task->tk_status = rpcauth_wrap_req(task, encode, req, p,
+                       task->tk_msg.rpc_argp);
+       if (task->tk_status == -ENOMEM) {
+               /* XXX: Is this sane? */
+               rpc_delay(task, 3*HZ);
+               task->tk_status = -EAGAIN;
        }
 }
 
@@ -658,43 +786,95 @@ static void
 call_bind(struct rpc_task *task)
 {
        struct rpc_clnt *clnt = task->tk_client;
-       struct rpc_xprt *xprt = clnt->cl_xprt;
-
-       dprintk("RPC: %4d call_bind xprt %p %s connected\n", task->tk_pid,
-                       xprt, (xprt_connected(xprt) ? "is" : "is not"));
 
-       task->tk_action = (xprt_connected(xprt)) ? call_transmit : call_connect;
+       dprintk("RPC: %4d call_bind (status %d)\n",
+                               task->tk_pid, task->tk_status);
 
+       task->tk_action = call_connect;
        if (!clnt->cl_port) {
-               task->tk_action = call_connect;
-               task->tk_timeout = RPC_CONNECT_TIMEOUT;
+               task->tk_action = call_bind_status;
+               task->tk_timeout = task->tk_xprt->bind_timeout;
                rpc_getport(task, clnt);
        }
 }
 
 /*
- * 4a. Connect to the RPC server (TCP case)
+ * 4a. Sort out bind result
+ */
+static void
+call_bind_status(struct rpc_task *task)
+{
+       int status = -EACCES;
+
+       if (task->tk_status >= 0) {
+               dprintk("RPC: %4d call_bind_status (status %d)\n",
+                                       task->tk_pid, task->tk_status);
+               task->tk_status = 0;
+               task->tk_action = call_connect;
+               return;
+       }
+
+       switch (task->tk_status) {
+       case -EACCES:
+               dprintk("RPC: %4d remote rpcbind: RPC program/version unavailable\n",
+                               task->tk_pid);
+               rpc_delay(task, 3*HZ);
+               goto retry_bind;
+       case -ETIMEDOUT:
+               dprintk("RPC: %4d rpcbind request timed out\n",
+                               task->tk_pid);
+               if (RPC_IS_SOFT(task)) {
+                       status = -EIO;
+                       break;
+               }
+               goto retry_bind;
+       case -EPFNOSUPPORT:
+               dprintk("RPC: %4d remote rpcbind service unavailable\n",
+                               task->tk_pid);
+               break;
+       case -EPROTONOSUPPORT:
+               dprintk("RPC: %4d remote rpcbind version 2 unavailable\n",
+                               task->tk_pid);
+               break;
+       default:
+               dprintk("RPC: %4d unrecognized rpcbind error (%d)\n",
+                               task->tk_pid, -task->tk_status);
+               status = -EIO;
+               break;
+       }
+
+       rpc_exit(task, status);
+       return;
+
+retry_bind:
+       task->tk_status = 0;
+       task->tk_action = call_bind;
+       return;
+}
+
+/*
+ * 4b. Connect to the RPC server
  */
 static void
 call_connect(struct rpc_task *task)
 {
-       struct rpc_clnt *clnt = task->tk_client;
+       struct rpc_xprt *xprt = task->tk_xprt;
 
-       dprintk("RPC: %4d call_connect status %d\n",
-                               task->tk_pid, task->tk_status);
+       dprintk("RPC: %4d call_connect xprt %p %s connected\n",
+                       task->tk_pid, xprt,
+                       (xprt_connected(xprt) ? "is" : "is not"));
 
-       if (xprt_connected(clnt->cl_xprt)) {
-               task->tk_action = call_transmit;
-               return;
+       task->tk_action = call_transmit;
+       if (!xprt_connected(xprt)) {
+               task->tk_action = call_connect_status;
+               if (task->tk_status < 0)
+                       return;
+               xprt_connect(task);
        }
-       task->tk_action = call_connect_status;
-       if (task->tk_status < 0)
-               return;
-       xprt_connect(task);
 }
 
 /*
- * 4b. Sort out connect result
+ * 4c. Sort out connect result
  */
 static void
 call_connect_status(struct rpc_task *task)
@@ -702,6 +882,9 @@ call_connect_status(struct rpc_task *task)
        struct rpc_clnt *clnt = task->tk_client;
        int status = task->tk_status;
 
+       dprintk("RPC: %5u call_connect_status (status %d)\n", 
+                               task->tk_pid, task->tk_status);
+
        task->tk_status = 0;
        if (status >= 0) {
                clnt->cl_stats->netreconn++;
@@ -709,17 +892,18 @@ call_connect_status(struct rpc_task *task)
                return;
        }
 
-       /* Something failed: we may have to rebind */
-       if (clnt->cl_autobind)
-               clnt->cl_port = 0;
+       /* Something failed: remote service port may have changed */
+       rpc_force_rebind(clnt);
+
        switch (status) {
        case -ENOTCONN:
        case -ETIMEDOUT:
        case -EAGAIN:
-               task->tk_action = (clnt->cl_port == 0) ? call_bind : call_connect;
+               task->tk_action = call_bind;
                break;
        default:
                rpc_exit(task, -EIO);
+               break;
        }
 }
 
@@ -739,17 +923,26 @@ call_transmit(struct rpc_task *task)
        if (task->tk_status != 0)
                return;
        /* Encode here so that rpcsec_gss can use correct sequence number. */
-       if (!task->tk_rqstp->rq_bytes_sent)
+       if (rpc_task_need_encode(task)) {
+               task->tk_rqstp->rq_bytes_sent = 0;
                call_encode(task);
-       if (task->tk_status < 0)
-               return;
+               /* Did the encode result in an error condition? */
+               if (task->tk_status != 0)
+                       goto out_nosend;
+       }
+       task->tk_action = call_transmit_status;
        xprt_transmit(task);
        if (task->tk_status < 0)
                return;
        if (!task->tk_msg.rpc_proc->p_decode) {
-               task->tk_action = NULL;
+               task->tk_action = rpc_exit_task;
                rpc_wake_up_task(task);
        }
+       return;
+out_nosend:
+       /* release socket write lock before attempting to handle error */
+       xprt_abort_transmit(task);
+       rpc_task_force_reencode(task);
 }
 
 /*
@@ -781,9 +974,7 @@ call_status(struct rpc_task *task)
                break;
        case -ECONNREFUSED:
        case -ENOTCONN:
-               req->rq_bytes_sent = 0;
-               if (clnt->cl_autobind)
-                       clnt->cl_port = 0;
+               rpc_force_rebind(clnt);
                task->tk_action = call_bind;
                break;
        case -EAGAIN:
@@ -794,8 +985,7 @@ call_status(struct rpc_task *task)
                rpc_exit(task, status);
                break;
        default:
-               if (clnt->cl_chatty)
-                       printk("%s: RPC call returned error %d\n",
+               printk("%s: RPC call returned error %d\n",
                               clnt->cl_protname, -status);
                rpc_exit(task, status);
                break;
@@ -803,7 +993,18 @@ call_status(struct rpc_task *task)
 }
 
 /*
- * 6a. Handle RPC timeout
+ * 6a. Handle transmission errors.
+ */
+static void
+call_transmit_status(struct rpc_task *task)
+{
+       if (task->tk_status != -EAGAIN)
+               rpc_task_force_reencode(task);
+       call_status(task);
+}
+
+/*
+ * 6b. Handle RPC timeout
  *     We do not release the request slot, so we keep using the
  *     same XID for all retransmits.
  */
@@ -818,21 +1019,21 @@ call_timeout(struct rpc_task *task)
        }
 
        dprintk("RPC: %4d call_timeout (major)\n", task->tk_pid);
+       task->tk_timeouts++;
+
        if (RPC_IS_SOFT(task)) {
-               if (clnt->cl_chatty)
-                       printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
+               printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
                                clnt->cl_protname, clnt->cl_server);
                rpc_exit(task, -EIO);
                return;
        }
 
-       if (clnt->cl_chatty && !(task->tk_flags & RPC_CALL_MAJORSEEN)) {
+       if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
                task->tk_flags |= RPC_CALL_MAJORSEEN;
                printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
                        clnt->cl_protname, clnt->cl_server);
        }
-       if (clnt->cl_autobind)
-               clnt->cl_port = 0;
+       rpc_force_rebind(clnt);
 
 retry:
        clnt->cl_stats->rpcretrans++;
@@ -854,7 +1055,7 @@ call_decode(struct rpc_task *task)
        dprintk("RPC: %4d call_decode (status %d)\n", 
                                task->tk_pid, task->tk_status);
 
-       if (clnt->cl_chatty && (task->tk_flags & RPC_CALL_MAJORSEEN)) {
+       if (task->tk_flags & RPC_CALL_MAJORSEEN) {
                printk(KERN_NOTICE "%s: server %s OK\n",
                        clnt->cl_protname, clnt->cl_server);
                task->tk_flags &= ~RPC_CALL_MAJORSEEN;
@@ -872,6 +1073,11 @@ call_decode(struct rpc_task *task)
                return;
        }
 
+       /*
+        * Ensure that we see all writes made by xprt_complete_rqst()
+        * before it changed req->rq_received.
+        */
+       smp_rmb();
        req->rq_rcv_buf.len = req->rq_private_buf.len;
 
        /* Check that the softirq receive buffer is valid */
@@ -879,13 +1085,14 @@ call_decode(struct rpc_task *task)
                                sizeof(req->rq_rcv_buf)) != 0);
 
        /* Verify the RPC header */
-       if (!(p = call_verify(task))) {
-               if (task->tk_action == NULL)
-                       return;
-               goto out_retry;
+       p = call_verify(task);
+       if (IS_ERR(p)) {
+               if (p == ERR_PTR(-EAGAIN))
+                       goto out_retry;
+               return;
        }
 
-       task->tk_action = NULL;
+       task->tk_action = rpc_exit_task;
 
        if (decode)
                task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
@@ -944,20 +1151,21 @@ static u32 *
 call_header(struct rpc_task *task)
 {
        struct rpc_clnt *clnt = task->tk_client;
-       struct rpc_xprt *xprt = clnt->cl_xprt;
        struct rpc_rqst *req = task->tk_rqstp;
        u32             *p = req->rq_svec[0].iov_base;
 
        /* FIXME: check buffer size? */
-       if (xprt->stream)
-               *p++ = 0;               /* fill in later */
+
+       p = xprt_skip_transport_header(task->tk_xprt, p);
        *p++ = req->rq_xid;             /* XID */
        *p++ = htonl(RPC_CALL);         /* CALL */
        *p++ = htonl(RPC_VERSION);      /* RPC version */
        *p++ = htonl(clnt->cl_prog);    /* program number */
        *p++ = htonl(clnt->cl_vers);    /* program version */
        *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
-       return rpcauth_marshcred(task, p);
+       p = rpcauth_marshcred(task, p);
+       req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
+       return p;
 }
 
 /*
@@ -977,7 +1185,7 @@ call_verify(struct rpc_task *task)
 
        if ((n = ntohl(*p++)) != RPC_REPLY) {
                printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
-               goto out_retry;
+               goto out_garbage;
        }
        if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
                if (--len < 0)
@@ -986,10 +1194,11 @@ call_verify(struct rpc_task *task)
                        case RPC_AUTH_ERROR:
                                break;
                        case RPC_MISMATCH:
-                               printk(KERN_WARNING "%s: RPC call version mismatch!\n", __FUNCTION__);
-                               goto out_eio;
+                               dprintk("%s: RPC call version mismatch!\n", __FUNCTION__);
+                               error = -EPROTONOSUPPORT;
+                               goto out_err;
                        default:
-                               printk(KERN_WARNING "%s: RPC call rejected, unknown error: %x\n", __FUNCTION__, n);
+                               dprintk("%s: RPC call rejected, unknown error: %x\n", __FUNCTION__, n);
                                goto out_eio;
                }
                if (--len < 0)
@@ -1006,7 +1215,7 @@ call_verify(struct rpc_task *task)
                                                        task->tk_pid);
                        rpcauth_invalcred(task);
                        task->tk_action = call_refresh;
-                       return NULL;
+                       goto out_retry;
                case RPC_AUTH_BADCRED:
                case RPC_AUTH_BADVERF:
                        /* possibly garbled cred/verf? */
@@ -1016,10 +1225,10 @@ call_verify(struct rpc_task *task)
                        dprintk("RPC: %4d call_verify: retry garbled creds\n",
                                                        task->tk_pid);
                        task->tk_action = call_bind;
-                       return NULL;
+                       goto out_retry;
                case RPC_AUTH_TOOWEAK:
-                       printk(KERN_NOTICE "call_verify: server requires stronger "
-                              "authentication.\n");
+                       printk(KERN_NOTICE "call_verify: server %s requires stronger "
+                              "authentication.\n", task->tk_client->cl_server);
                        break;
                default:
                        printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
@@ -1031,7 +1240,7 @@ call_verify(struct rpc_task *task)
        }
        if (!(p = rpcauth_checkverf(task, p))) {
                printk(KERN_WARNING "call_verify: auth check failed\n");
-               goto out_retry;         /* bad verifier, retry */
+               goto out_garbage;               /* bad verifier, retry */
        }
        len = p - (u32 *)iov->iov_base - 1;
        if (len < 0)
@@ -1040,23 +1249,26 @@ call_verify(struct rpc_task *task)
        case RPC_SUCCESS:
                return p;
        case RPC_PROG_UNAVAIL:
-               printk(KERN_WARNING "RPC: call_verify: program %u is unsupported by server %s\n",
+               dprintk("RPC: call_verify: program %u is unsupported by server %s\n",
                                (unsigned int)task->tk_client->cl_prog,
                                task->tk_client->cl_server);
-               goto out_eio;
+               error = -EPFNOSUPPORT;
+               goto out_err;
        case RPC_PROG_MISMATCH:
-               printk(KERN_WARNING "RPC: call_verify: program %u, version %u unsupported by server %s\n",
+               dprintk("RPC: call_verify: program %u, version %u unsupported by server %s\n",
                                (unsigned int)task->tk_client->cl_prog,
                                (unsigned int)task->tk_client->cl_vers,
                                task->tk_client->cl_server);
-               goto out_eio;
+               error = -EPROTONOSUPPORT;
+               goto out_err;
        case RPC_PROC_UNAVAIL:
-               printk(KERN_WARNING "RPC: call_verify: proc %p unsupported by program %u, version %u on server %s\n",
+               dprintk("RPC: call_verify: proc %p unsupported by program %u, version %u on server %s\n",
                                task->tk_msg.rpc_proc,
                                task->tk_client->cl_prog,
                                task->tk_client->cl_vers,
                                task->tk_client->cl_server);
-               goto out_eio;
+               error = -EOPNOTSUPP;
+               goto out_err;
        case RPC_GARBAGE_ARGS:
                dprintk("RPC: %4d %s: server saw garbage\n", task->tk_pid, __FUNCTION__);
                break;                  /* retry */
@@ -1065,21 +1277,49 @@ call_verify(struct rpc_task *task)
                /* Also retry */
        }
 
-out_retry:
+out_garbage:
        task->tk_client->cl_stats->rpcgarbage++;
        if (task->tk_garb_retry) {
                task->tk_garb_retry--;
-               dprintk(KERN_WARNING "RPC %s: retrying %4d\n", __FUNCTION__, task->tk_pid);
+               dprintk("RPC %s: retrying %4d\n", __FUNCTION__, task->tk_pid);
                task->tk_action = call_bind;
-               return NULL;
+out_retry:
+               return ERR_PTR(-EAGAIN);
        }
        printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__);
 out_eio:
        error = -EIO;
 out_err:
        rpc_exit(task, error);
-       return NULL;
+       return ERR_PTR(error);
 out_overflow:
        printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__);
-       goto out_retry;
+       goto out_garbage;
+}
+
+static int rpcproc_encode_null(void *rqstp, u32 *data, void *obj)
+{
+       return 0;
+}
+
+static int rpcproc_decode_null(void *rqstp, u32 *data, void *obj)
+{
+       return 0;
+}
+
+static struct rpc_procinfo rpcproc_null = {
+       .p_encode = rpcproc_encode_null,
+       .p_decode = rpcproc_decode_null,
+};
+
+int rpc_ping(struct rpc_clnt *clnt, int flags)
+{
+       struct rpc_message msg = {
+               .rpc_proc = &rpcproc_null,
+       };
+       int err;
+       msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
+       err = rpc_call_sync(clnt, &msg, flags);
+       put_rpccred(msg.rpc_cred);
+       return err;
 }