upgrade to linux 2.6.9-1.11_FC2
[linux-2.6.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/rpcclnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23
24 #include <asm/system.h>
25
26 #include <linux/types.h>
27 #include <linux/mm.h>
28 #include <linux/slab.h>
29 #include <linux/in.h>
30 #include <linux/utsname.h>
31
32 #include <linux/sunrpc/clnt.h>
33 #include <linux/workqueue.h>
34 #include <linux/sunrpc/rpc_pipe_fs.h>
35
36 #include <linux/nfs.h>
37
38
39 #define RPC_SLACK_SPACE         (1024)  /* total overkill */
40
41 #ifdef RPC_DEBUG
42 # define RPCDBG_FACILITY        RPCDBG_CALL
43 #endif
44
45 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
46
47
48 static void     call_start(struct rpc_task *task);
49 static void     call_reserve(struct rpc_task *task);
50 static void     call_reserveresult(struct rpc_task *task);
51 static void     call_allocate(struct rpc_task *task);
52 static void     call_encode(struct rpc_task *task);
53 static void     call_decode(struct rpc_task *task);
54 static void     call_bind(struct rpc_task *task);
55 static void     call_transmit(struct rpc_task *task);
56 static void     call_status(struct rpc_task *task);
57 static void     call_refresh(struct rpc_task *task);
58 static void     call_refreshresult(struct rpc_task *task);
59 static void     call_timeout(struct rpc_task *task);
60 static void     call_connect(struct rpc_task *task);
61 static void     call_connect_status(struct rpc_task *task);
62 static u32 *    call_header(struct rpc_task *task);
63 static u32 *    call_verify(struct rpc_task *task);
64
65
66 static int
67 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
68 {
69         static uint32_t clntid;
70         int error;
71
72         if (dir_name == NULL)
73                 return 0;
74         for (;;) {
75                 snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
76                                 "%s/clnt%x", dir_name,
77                                 (unsigned int)clntid++);
78                 clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
79                 clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
80                 if (!IS_ERR(clnt->cl_dentry))
81                         return 0;
82                 error = PTR_ERR(clnt->cl_dentry);
83                 if (error != -EEXIST) {
84                         printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
85                                         clnt->cl_pathname, error);
86                         return error;
87                 }
88         }
89 }
90
91 /*
92  * Create an RPC client
93  * FIXME: This should also take a flags argument (as in task->tk_flags).
94  * It's called (among others) from pmap_create_client, which may in
95  * turn be called by an async task. In this case, rpciod should not be
96  * made to sleep too long.
97  */
98 struct rpc_clnt *
99 rpc_create_client(struct rpc_xprt *xprt, char *servname,
100                   struct rpc_program *program, u32 vers,
101                   rpc_authflavor_t flavor)
102 {
103         struct rpc_version      *version;
104         struct rpc_clnt         *clnt = NULL;
105         int err;
106         int len;
107
108         dprintk("RPC: creating %s client for %s (xprt %p)\n",
109                 program->name, servname, xprt);
110
111         err = -EINVAL;
112         if (!xprt)
113                 goto out_err;
114         if (vers >= program->nrvers || !(version = program->version[vers]))
115                 goto out_err;
116
117         err = -ENOMEM;
118         clnt = (struct rpc_clnt *) kmalloc(sizeof(*clnt), GFP_KERNEL);
119         if (!clnt)
120                 goto out_err;
121         memset(clnt, 0, sizeof(*clnt));
122         atomic_set(&clnt->cl_users, 0);
123         atomic_set(&clnt->cl_count, 1);
124         clnt->cl_parent = clnt;
125
126         clnt->cl_server = clnt->cl_inline_name;
127         len = strlen(servname) + 1;
128         if (len > sizeof(clnt->cl_inline_name)) {
129                 char *buf = kmalloc(len, GFP_KERNEL);
130                 if (buf != 0)
131                         clnt->cl_server = buf;
132                 else
133                         len = sizeof(clnt->cl_inline_name);
134         }
135         strlcpy(clnt->cl_server, servname, len);
136
137         clnt->cl_xprt     = xprt;
138         clnt->cl_procinfo = version->procs;
139         clnt->cl_maxproc  = version->nrprocs;
140         clnt->cl_protname = program->name;
141         clnt->cl_pmap     = &clnt->cl_pmap_default;
142         clnt->cl_port     = xprt->addr.sin_port;
143         clnt->cl_prog     = program->number;
144         clnt->cl_vers     = version->number;
145         clnt->cl_prot     = xprt->prot;
146         clnt->cl_stats    = program->stats;
147         rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
148
149         if (!clnt->cl_port)
150                 clnt->cl_autobind = 1;
151
152         clnt->cl_rtt = &clnt->cl_rtt_default;
153         rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
154
155         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
156         if (err < 0)
157                 goto out_no_path;
158
159         err = -ENOMEM;
160         if (!rpcauth_create(flavor, clnt)) {
161                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
162                                 flavor);
163                 goto out_no_auth;
164         }
165
166         /* save the nodename */
167         clnt->cl_nodelen = strlen(system_utsname.nodename);
168         if (clnt->cl_nodelen > UNX_MAXNODENAME)
169                 clnt->cl_nodelen = UNX_MAXNODENAME;
170         memcpy(clnt->cl_nodename, system_utsname.nodename, clnt->cl_nodelen);
171         return clnt;
172
173 out_no_auth:
174         rpc_rmdir(clnt->cl_pathname);
175 out_no_path:
176         if (clnt->cl_server != clnt->cl_inline_name)
177                 kfree(clnt->cl_server);
178         kfree(clnt);
179 out_err:
180         return ERR_PTR(err);
181 }
182
183 /*
184  * This function clones the RPC client structure. It allows us to share the
185  * same transport while varying parameters such as the authentication
186  * flavour.
187  */
188 struct rpc_clnt *
189 rpc_clone_client(struct rpc_clnt *clnt)
190 {
191         struct rpc_clnt *new;
192
193         new = (struct rpc_clnt *)kmalloc(sizeof(*new), GFP_KERNEL);
194         if (!new)
195                 goto out_no_clnt;
196         memcpy(new, clnt, sizeof(*new));
197         atomic_set(&new->cl_count, 1);
198         atomic_set(&new->cl_users, 0);
199         new->cl_parent = clnt;
200         atomic_inc(&clnt->cl_count);
201         /* Duplicate portmapper */
202         rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
203         /* Turn off autobind on clones */
204         new->cl_autobind = 0;
205         new->cl_oneshot = 0;
206         new->cl_dead = 0;
207         rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
208         if (new->cl_auth)
209                 atomic_inc(&new->cl_auth->au_count);
210         return new;
211 out_no_clnt:
212         printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
213         return ERR_PTR(-ENOMEM);
214 }
215
216 /*
217  * Properly shut down an RPC client, terminating all outstanding
218  * requests. Note that we must be certain that cl_oneshot and
219  * cl_dead are cleared, or else the client would be destroyed
220  * when the last task releases it.
221  */
222 int
223 rpc_shutdown_client(struct rpc_clnt *clnt)
224 {
225         wait_queue_t __wait;
226         init_waitqueue_entry(&__wait, current);
227         dprintk("RPC: shutting down %s client for %s, tasks=%d\n",
228                         clnt->cl_protname, clnt->cl_server,
229                         atomic_read(&clnt->cl_users));
230
231         add_wait_queue(&destroy_wait, &__wait);
232         set_current_state(TASK_UNINTERRUPTIBLE);
233         while (atomic_read(&clnt->cl_users) > 0) {
234                 /* Don't let rpc_release_client destroy us */
235                 clnt->cl_oneshot = 0;
236                 clnt->cl_dead = 0;
237                 rpc_killall_tasks(clnt);
238                 schedule_timeout(1*HZ);
239                 set_current_state(TASK_UNINTERRUPTIBLE);
240         }
241         current->state = TASK_RUNNING;
242         remove_wait_queue(&destroy_wait, &__wait);
243
244         if (atomic_read(&clnt->cl_users) < 0) {
245                 printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n",
246                                 clnt, atomic_read(&clnt->cl_users));
247 #ifdef RPC_DEBUG
248                 rpc_show_tasks();
249 #endif
250                 BUG();
251         }
252
253         return rpc_destroy_client(clnt);
254 }
255
256 /*
257  * Delete an RPC client
258  */
259 int
260 rpc_destroy_client(struct rpc_clnt *clnt)
261 {
262         if (!atomic_dec_and_test(&clnt->cl_count))
263                 return 1;
264         BUG_ON(atomic_read(&clnt->cl_users) != 0);
265
266         dprintk("RPC: destroying %s client for %s\n",
267                         clnt->cl_protname, clnt->cl_server);
268         if (clnt->cl_auth) {
269                 rpcauth_destroy(clnt->cl_auth);
270                 clnt->cl_auth = NULL;
271         }
272         if (clnt->cl_parent != clnt) {
273                 rpc_destroy_client(clnt->cl_parent);
274                 goto out_free;
275         }
276         if (clnt->cl_pathname[0])
277                 rpc_rmdir(clnt->cl_pathname);
278         if (clnt->cl_xprt) {
279                 xprt_destroy(clnt->cl_xprt);
280                 clnt->cl_xprt = NULL;
281         }
282         if (clnt->cl_server != clnt->cl_inline_name)
283                 kfree(clnt->cl_server);
284 out_free:
285         kfree(clnt);
286         return 0;
287 }
288
289 /*
290  * Release an RPC client
291  */
292 void
293 rpc_release_client(struct rpc_clnt *clnt)
294 {
295         dprintk("RPC:      rpc_release_client(%p, %d)\n",
296                                 clnt, atomic_read(&clnt->cl_users));
297
298         if (!atomic_dec_and_test(&clnt->cl_users))
299                 return;
300         wake_up(&destroy_wait);
301         if (clnt->cl_oneshot || clnt->cl_dead)
302                 rpc_destroy_client(clnt);
303 }
304
305 /*
306  * Default callback for async RPC calls
307  */
308 static void
309 rpc_default_callback(struct rpc_task *task)
310 {
311 }
312
313 /*
314  *      Export the signal mask handling for aysnchronous code that
315  *      sleeps on RPC calls
316  */
317  
318 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
319 {
320         unsigned long   sigallow = sigmask(SIGKILL);
321         unsigned long   irqflags;
322         
323         /* Turn off various signals */
324         if (clnt->cl_intr) {
325                 struct k_sigaction *action = current->sighand->action;
326                 if (action[SIGINT-1].sa.sa_handler == SIG_DFL)
327                         sigallow |= sigmask(SIGINT);
328                 if (action[SIGQUIT-1].sa.sa_handler == SIG_DFL)
329                         sigallow |= sigmask(SIGQUIT);
330         }
331         spin_lock_irqsave(&current->sighand->siglock, irqflags);
332         *oldset = current->blocked;
333         siginitsetinv(&current->blocked, sigallow & ~oldset->sig[0]);
334         recalc_sigpending();
335         spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
336 }
337
338 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
339 {
340         unsigned long   irqflags;
341         
342         spin_lock_irqsave(&current->sighand->siglock, irqflags);
343         current->blocked = *oldset;
344         recalc_sigpending();
345         spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
346 }
347
348 /*
349  * New rpc_call implementation
350  */
351 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
352 {
353         struct rpc_task *task;
354         sigset_t        oldset;
355         int             status;
356
357         /* If this client is slain all further I/O fails */
358         if (clnt->cl_dead) 
359                 return -EIO;
360
361         BUG_ON(flags & RPC_TASK_ASYNC);
362
363         rpc_clnt_sigmask(clnt, &oldset);                
364
365         status = -ENOMEM;
366         task = rpc_new_task(clnt, NULL, flags);
367         if (task == NULL)
368                 goto out;
369
370         rpc_call_setup(task, msg, 0);
371
372         /* Set up the call info struct and execute the task */
373         if (task->tk_status == 0)
374                 status = rpc_execute(task);
375         else {
376                 status = task->tk_status;
377                 rpc_release_task(task);
378         }
379
380 out:
381         rpc_clnt_sigunmask(clnt, &oldset);              
382
383         return status;
384 }
385
386 /*
387  * New rpc_call implementation
388  */
389 int
390 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
391                rpc_action callback, void *data)
392 {
393         struct rpc_task *task;
394         sigset_t        oldset;
395         int             status;
396
397         /* If this client is slain all further I/O fails */
398         if (clnt->cl_dead) 
399                 return -EIO;
400
401         flags |= RPC_TASK_ASYNC;
402
403         rpc_clnt_sigmask(clnt, &oldset);                
404
405         /* Create/initialize a new RPC task */
406         if (!callback)
407                 callback = rpc_default_callback;
408         status = -ENOMEM;
409         if (!(task = rpc_new_task(clnt, callback, flags)))
410                 goto out;
411         task->tk_calldata = data;
412
413         rpc_call_setup(task, msg, 0);
414
415         /* Set up the call info struct and execute the task */
416         if (task->tk_status == 0)
417                 status = rpc_execute(task);
418         else {
419                 status = task->tk_status;
420                 rpc_release_task(task);
421         }
422
423 out:
424         rpc_clnt_sigunmask(clnt, &oldset);              
425
426         return status;
427 }
428
429
430 void
431 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
432 {
433         task->tk_msg   = *msg;
434         task->tk_flags |= flags;
435         /* Bind the user cred */
436         if (task->tk_msg.rpc_cred != NULL) {
437                 rpcauth_holdcred(task);
438         } else
439                 rpcauth_bindcred(task);
440
441         if (task->tk_status == 0)
442                 task->tk_action = call_start;
443         else
444                 task->tk_action = NULL;
445 }
446
447 void
448 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
449 {
450         struct rpc_xprt *xprt = clnt->cl_xprt;
451
452         xprt->sndsize = 0;
453         if (sndsize)
454                 xprt->sndsize = sndsize + RPC_SLACK_SPACE;
455         xprt->rcvsize = 0;
456         if (rcvsize)
457                 xprt->rcvsize = rcvsize + RPC_SLACK_SPACE;
458         if (xprt_connected(xprt))
459                 xprt_sock_setbufsize(xprt);
460 }
461
462 /*
463  * Restart an (async) RPC call. Usually called from within the
464  * exit handler.
465  */
466 void
467 rpc_restart_call(struct rpc_task *task)
468 {
469         if (RPC_ASSASSINATED(task))
470                 return;
471
472         task->tk_action = call_start;
473 }
474
475 /*
476  * 0.  Initial state
477  *
478  *     Other FSM states can be visited zero or more times, but
479  *     this state is visited exactly once for each RPC.
480  */
481 static void
482 call_start(struct rpc_task *task)
483 {
484         struct rpc_clnt *clnt = task->tk_client;
485
486         dprintk("RPC: %4d call_start %s%d proc %d (%s)\n", task->tk_pid,
487                 clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc->p_proc,
488                 (RPC_IS_ASYNC(task) ? "async" : "sync"));
489
490         /* Increment call count */
491         task->tk_msg.rpc_proc->p_count++;
492         clnt->cl_stats->rpccnt++;
493         task->tk_action = call_reserve;
494 }
495
496 /*
497  * 1.   Reserve an RPC call slot
498  */
499 static void
500 call_reserve(struct rpc_task *task)
501 {
502         dprintk("RPC: %4d call_reserve\n", task->tk_pid);
503
504         if (!rpcauth_uptodatecred(task)) {
505                 task->tk_action = call_refresh;
506                 return;
507         }
508
509         task->tk_status  = 0;
510         task->tk_action  = call_reserveresult;
511         xprt_reserve(task);
512 }
513
514 /*
515  * 1b.  Grok the result of xprt_reserve()
516  */
517 static void
518 call_reserveresult(struct rpc_task *task)
519 {
520         int status = task->tk_status;
521
522         dprintk("RPC: %4d call_reserveresult (status %d)\n",
523                                 task->tk_pid, task->tk_status);
524
525         /*
526          * After a call to xprt_reserve(), we must have either
527          * a request slot or else an error status.
528          */
529         task->tk_status = 0;
530         if (status >= 0) {
531                 if (task->tk_rqstp) {
532                         task->tk_action = call_allocate;
533                         return;
534                 }
535
536                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
537                                 __FUNCTION__, status);
538                 rpc_exit(task, -EIO);
539                 return;
540         }
541
542         /*
543          * Even though there was an error, we may have acquired
544          * a request slot somehow.  Make sure not to leak it.
545          */
546         if (task->tk_rqstp) {
547                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
548                                 __FUNCTION__, status);
549                 xprt_release(task);
550         }
551
552         switch (status) {
553         case -EAGAIN:   /* woken up; retry */
554                 task->tk_action = call_reserve;
555                 return;
556         case -EIO:      /* probably a shutdown */
557                 break;
558         default:
559                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
560                                 __FUNCTION__, status);
561                 break;
562         }
563         rpc_exit(task, status);
564 }
565
566 /*
567  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
568  *      (Note: buffer memory is freed in rpc_task_release).
569  */
570 static void
571 call_allocate(struct rpc_task *task)
572 {
573         unsigned int    bufsiz;
574
575         dprintk("RPC: %4d call_allocate (status %d)\n", 
576                                 task->tk_pid, task->tk_status);
577         task->tk_action = call_bind;
578         if (task->tk_buffer)
579                 return;
580
581         /* FIXME: compute buffer requirements more exactly using
582          * auth->au_wslack */
583         bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
584
585         if (rpc_malloc(task, bufsiz << 1) != NULL)
586                 return;
587         printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); 
588
589         if (RPC_IS_ASYNC(task) || !(task->tk_client->cl_intr && signalled())) {
590                 xprt_release(task);
591                 task->tk_action = call_reserve;
592                 rpc_delay(task, HZ>>4);
593                 return;
594         }
595
596         rpc_exit(task, -ERESTARTSYS);
597 }
598
599 /*
600  * 3.   Encode arguments of an RPC call
601  */
602 static void
603 call_encode(struct rpc_task *task)
604 {
605         struct rpc_clnt *clnt = task->tk_client;
606         struct rpc_rqst *req = task->tk_rqstp;
607         struct xdr_buf *sndbuf = &req->rq_snd_buf;
608         struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
609         unsigned int    bufsiz;
610         kxdrproc_t      encode;
611         int             status;
612         u32             *p;
613
614         dprintk("RPC: %4d call_encode (status %d)\n", 
615                                 task->tk_pid, task->tk_status);
616
617         /* Default buffer setup */
618         bufsiz = task->tk_bufsize >> 1;
619         sndbuf->head[0].iov_base = (void *)task->tk_buffer;
620         sndbuf->head[0].iov_len  = bufsiz;
621         sndbuf->tail[0].iov_len  = 0;
622         sndbuf->page_len         = 0;
623         sndbuf->len              = 0;
624         sndbuf->buflen           = bufsiz;
625         rcvbuf->head[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz);
626         rcvbuf->head[0].iov_len  = bufsiz;
627         rcvbuf->tail[0].iov_len  = 0;
628         rcvbuf->page_len         = 0;
629         rcvbuf->len              = 0;
630         rcvbuf->buflen           = bufsiz;
631
632         /* Encode header and provided arguments */
633         encode = task->tk_msg.rpc_proc->p_encode;
634         if (!(p = call_header(task))) {
635                 printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
636                 rpc_exit(task, -EIO);
637                 return;
638         }
639         if (encode && (status = rpcauth_wrap_req(task, encode, req, p,
640                                                  task->tk_msg.rpc_argp)) < 0) {
641                 printk(KERN_WARNING "%s: can't encode arguments: %d\n",
642                                 clnt->cl_protname, -status);
643                 rpc_exit(task, status);
644         }
645 }
646
647 /*
648  * 4.   Get the server port number if not yet set
649  */
650 static void
651 call_bind(struct rpc_task *task)
652 {
653         struct rpc_clnt *clnt = task->tk_client;
654         struct rpc_xprt *xprt = clnt->cl_xprt;
655
656         dprintk("RPC: %4d call_bind xprt %p %s connected\n", task->tk_pid,
657                         xprt, (xprt_connected(xprt) ? "is" : "is not"));
658
659         task->tk_action = (xprt_connected(xprt)) ? call_transmit : call_connect;
660
661         if (!clnt->cl_port) {
662                 task->tk_action = call_connect;
663                 task->tk_timeout = RPC_CONNECT_TIMEOUT;
664                 rpc_getport(task, clnt);
665         }
666 }
667
668 /*
669  * 4a.  Connect to the RPC server (TCP case)
670  */
671 static void
672 call_connect(struct rpc_task *task)
673 {
674         struct rpc_clnt *clnt = task->tk_client;
675
676         dprintk("RPC: %4d call_connect status %d\n",
677                                 task->tk_pid, task->tk_status);
678
679         if (xprt_connected(clnt->cl_xprt)) {
680                 task->tk_action = call_transmit;
681                 return;
682         }
683         task->tk_action = call_connect_status;
684         if (task->tk_status < 0)
685                 return;
686         xprt_connect(task);
687 }
688
689 /*
690  * 4b. Sort out connect result
691  */
692 static void
693 call_connect_status(struct rpc_task *task)
694 {
695         struct rpc_clnt *clnt = task->tk_client;
696         int status = task->tk_status;
697
698         task->tk_status = 0;
699         if (status >= 0) {
700                 clnt->cl_stats->netreconn++;
701                 task->tk_action = call_transmit;
702                 return;
703         }
704
705         /* Something failed: we may have to rebind */
706         if (clnt->cl_autobind)
707                 clnt->cl_port = 0;
708         switch (status) {
709         case -ENOTCONN:
710         case -ETIMEDOUT:
711         case -EAGAIN:
712                 task->tk_action = (clnt->cl_port == 0) ? call_bind : call_connect;
713                 break;
714         default:
715                 rpc_exit(task, -EIO);
716         }
717 }
718
719 /*
720  * 5.   Transmit the RPC request, and wait for reply
721  */
722 static void
723 call_transmit(struct rpc_task *task)
724 {
725         dprintk("RPC: %4d call_transmit (status %d)\n", 
726                                 task->tk_pid, task->tk_status);
727
728         task->tk_action = call_status;
729         if (task->tk_status < 0)
730                 return;
731         task->tk_status = xprt_prepare_transmit(task);
732         if (task->tk_status != 0)
733                 return;
734         /* Encode here so that rpcsec_gss can use correct sequence number. */
735         if (!task->tk_rqstp->rq_bytes_sent)
736                 call_encode(task);
737         if (task->tk_status < 0)
738                 return;
739         xprt_transmit(task);
740         if (task->tk_status < 0)
741                 return;
742         if (!task->tk_msg.rpc_proc->p_decode) {
743                 task->tk_action = NULL;
744                 rpc_wake_up_task(task);
745         }
746 }
747
748 /*
749  * 6.   Sort out the RPC call status
750  */
751 static void
752 call_status(struct rpc_task *task)
753 {
754         struct rpc_clnt *clnt = task->tk_client;
755         struct rpc_rqst *req = task->tk_rqstp;
756         int             status;
757
758         if (req->rq_received > 0 && !req->rq_bytes_sent)
759                 task->tk_status = req->rq_received;
760
761         dprintk("RPC: %4d call_status (status %d)\n", 
762                                 task->tk_pid, task->tk_status);
763
764         status = task->tk_status;
765         if (status >= 0) {
766                 task->tk_action = call_decode;
767                 return;
768         }
769
770         task->tk_status = 0;
771         switch(status) {
772         case -ETIMEDOUT:
773                 task->tk_action = call_timeout;
774                 break;
775         case -ECONNREFUSED:
776         case -ENOTCONN:
777                 req->rq_bytes_sent = 0;
778                 if (clnt->cl_autobind)
779                         clnt->cl_port = 0;
780                 task->tk_action = call_bind;
781                 break;
782         case -EAGAIN:
783                 task->tk_action = call_transmit;
784                 break;
785         case -EIO:
786                 /* shutdown or soft timeout */
787                 rpc_exit(task, status);
788                 break;
789         default:
790                 if (clnt->cl_chatty)
791                         printk("%s: RPC call returned error %d\n",
792                                clnt->cl_protname, -status);
793                 rpc_exit(task, status);
794                 break;
795         }
796 }
797
798 /*
799  * 6a.  Handle RPC timeout
800  *      We do not release the request slot, so we keep using the
801  *      same XID for all retransmits.
802  */
803 static void
804 call_timeout(struct rpc_task *task)
805 {
806         struct rpc_clnt *clnt = task->tk_client;
807
808         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
809                 dprintk("RPC: %4d call_timeout (minor)\n", task->tk_pid);
810                 goto retry;
811         }
812
813         dprintk("RPC: %4d call_timeout (major)\n", task->tk_pid);
814         if (RPC_IS_SOFT(task)) {
815                 if (clnt->cl_chatty)
816                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
817                                 clnt->cl_protname, clnt->cl_server);
818                 rpc_exit(task, -EIO);
819                 return;
820         }
821
822         if (clnt->cl_chatty && !(task->tk_flags & RPC_CALL_MAJORSEEN)) {
823                 task->tk_flags |= RPC_CALL_MAJORSEEN;
824                 printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
825                         clnt->cl_protname, clnt->cl_server);
826         }
827         if (clnt->cl_autobind)
828                 clnt->cl_port = 0;
829
830 retry:
831         clnt->cl_stats->rpcretrans++;
832         task->tk_action = call_bind;
833         task->tk_status = 0;
834 }
835
836 /*
837  * 7.   Decode the RPC reply
838  */
839 static void
840 call_decode(struct rpc_task *task)
841 {
842         struct rpc_clnt *clnt = task->tk_client;
843         struct rpc_rqst *req = task->tk_rqstp;
844         kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
845         u32             *p;
846
847         dprintk("RPC: %4d call_decode (status %d)\n", 
848                                 task->tk_pid, task->tk_status);
849
850         if (clnt->cl_chatty && (task->tk_flags & RPC_CALL_MAJORSEEN)) {
851                 printk(KERN_NOTICE "%s: server %s OK\n",
852                         clnt->cl_protname, clnt->cl_server);
853                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
854         }
855
856         if (task->tk_status < 12) {
857                 if (!RPC_IS_SOFT(task)) {
858                         task->tk_action = call_bind;
859                         clnt->cl_stats->rpcretrans++;
860                         goto out_retry;
861                 }
862                 printk(KERN_WARNING "%s: too small RPC reply size (%d bytes)\n",
863                         clnt->cl_protname, task->tk_status);
864                 rpc_exit(task, -EIO);
865                 return;
866         }
867
868         req->rq_rcv_buf.len = req->rq_private_buf.len;
869
870         /* Check that the softirq receive buffer is valid */
871         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
872                                 sizeof(req->rq_rcv_buf)) != 0);
873
874         /* Verify the RPC header */
875         if (!(p = call_verify(task))) {
876                 if (task->tk_action == NULL)
877                         return;
878                 goto out_retry;
879         }
880
881         /*
882          * The following is an NFS-specific hack to cater for setuid
883          * processes whose uid is mapped to nobody on the server.
884          */
885         if (task->tk_client->cl_droppriv && 
886             (ntohl(*p) == NFSERR_ACCES || ntohl(*p) == NFSERR_PERM)) {
887                 if (RPC_IS_SETUID(task) && task->tk_suid_retry) {
888                         dprintk("RPC: %4d retry squashed uid\n", task->tk_pid);
889                         task->tk_flags ^= RPC_CALL_REALUID;
890                         task->tk_action = call_bind;
891                         task->tk_suid_retry--;
892                         goto out_retry;
893                 }
894         }
895
896         task->tk_action = NULL;
897
898         if (decode)
899                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
900                                                       task->tk_msg.rpc_resp);
901         dprintk("RPC: %4d call_decode result %d\n", task->tk_pid,
902                                         task->tk_status);
903         return;
904 out_retry:
905         req->rq_received = req->rq_private_buf.len = 0;
906         task->tk_status = 0;
907 }
908
909 /*
910  * 8.   Refresh the credentials if rejected by the server
911  */
912 static void
913 call_refresh(struct rpc_task *task)
914 {
915         dprintk("RPC: %4d call_refresh\n", task->tk_pid);
916
917         xprt_release(task);     /* Must do to obtain new XID */
918         task->tk_action = call_refreshresult;
919         task->tk_status = 0;
920         task->tk_client->cl_stats->rpcauthrefresh++;
921         rpcauth_refreshcred(task);
922 }
923
924 /*
925  * 8a.  Process the results of a credential refresh
926  */
927 static void
928 call_refreshresult(struct rpc_task *task)
929 {
930         int status = task->tk_status;
931         dprintk("RPC: %4d call_refreshresult (status %d)\n", 
932                                 task->tk_pid, task->tk_status);
933
934         task->tk_status = 0;
935         task->tk_action = call_reserve;
936         if (status >= 0 && rpcauth_uptodatecred(task))
937                 return;
938         if (rpcauth_deadcred(task)) {
939                 rpc_exit(task, -EACCES);
940                 return;
941         }
942         task->tk_action = call_refresh;
943         if (status != -ETIMEDOUT)
944                 rpc_delay(task, 3*HZ);
945         return;
946 }
947
948 /*
949  * Call header serialization
950  */
951 static u32 *
952 call_header(struct rpc_task *task)
953 {
954         struct rpc_clnt *clnt = task->tk_client;
955         struct rpc_xprt *xprt = clnt->cl_xprt;
956         struct rpc_rqst *req = task->tk_rqstp;
957         u32             *p = req->rq_svec[0].iov_base;
958
959         /* FIXME: check buffer size? */
960         if (xprt->stream)
961                 *p++ = 0;               /* fill in later */
962         *p++ = req->rq_xid;             /* XID */
963         *p++ = htonl(RPC_CALL);         /* CALL */
964         *p++ = htonl(RPC_VERSION);      /* RPC version */
965         *p++ = htonl(clnt->cl_prog);    /* program number */
966         *p++ = htonl(clnt->cl_vers);    /* program version */
967         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
968         return rpcauth_marshcred(task, p);
969 }
970
971 /*
972  * Reply header verification
973  */
974 static u32 *
975 call_verify(struct rpc_task *task)
976 {
977         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
978         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
979         u32     *p = iov->iov_base, n;
980
981         if ((len -= 3) < 0)
982                 goto garbage;
983         p += 1; /* skip XID */
984
985         if ((n = ntohl(*p++)) != RPC_REPLY) {
986                 printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
987                 goto garbage;
988         }
989         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
990                 int     error = -EACCES;
991
992                 if (--len < 0)
993                         goto garbage;
994                 if ((n = ntohl(*p++)) != RPC_AUTH_ERROR) {
995                         printk(KERN_WARNING "call_verify: RPC call rejected: %x\n", n);
996                 } else if (--len < 0)
997                 switch ((n = ntohl(*p++))) {
998                 case RPC_AUTH_REJECTEDCRED:
999                 case RPC_AUTH_REJECTEDVERF:
1000                 case RPCSEC_GSS_CREDPROBLEM:
1001                 case RPCSEC_GSS_CTXPROBLEM:
1002                         if (!task->tk_cred_retry)
1003                                 break;
1004                         task->tk_cred_retry--;
1005                         dprintk("RPC: %4d call_verify: retry stale creds\n",
1006                                                         task->tk_pid);
1007                         rpcauth_invalcred(task);
1008                         task->tk_action = call_refresh;
1009                         return NULL;
1010                 case RPC_AUTH_BADCRED:
1011                 case RPC_AUTH_BADVERF:
1012                         /* possibly garbled cred/verf? */
1013                         if (!task->tk_garb_retry)
1014                                 break;
1015                         task->tk_garb_retry--;
1016                         dprintk("RPC: %4d call_verify: retry garbled creds\n",
1017                                                         task->tk_pid);
1018                         task->tk_action = call_bind;
1019                         return NULL;
1020                 case RPC_AUTH_TOOWEAK:
1021                         printk(KERN_NOTICE "call_verify: server requires stronger "
1022                                "authentication.\n");
1023                         break;
1024                 default:
1025                         printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1026                         error = -EIO;
1027                 } else
1028                         goto garbage;
1029                 dprintk("RPC: %4d call_verify: call rejected %d\n",
1030                                                 task->tk_pid, n);
1031                 rpc_exit(task, error);
1032                 return NULL;
1033         }
1034         if (!(p = rpcauth_checkverf(task, p))) {
1035                 printk(KERN_WARNING "call_verify: auth check failed\n");
1036                 goto garbage;           /* bad verifier, retry */
1037         }
1038         len = p - (u32 *)iov->iov_base - 1;
1039         if (len < 0)
1040                 goto garbage;
1041         switch ((n = ntohl(*p++))) {
1042         case RPC_SUCCESS:
1043                 return p;
1044         case RPC_PROG_UNAVAIL:
1045                 printk(KERN_WARNING "RPC: call_verify: program %u is unsupported by server %s\n",
1046                                 (unsigned int)task->tk_client->cl_prog,
1047                                 task->tk_client->cl_server);
1048                 goto out_eio;
1049         case RPC_PROG_MISMATCH:
1050                 printk(KERN_WARNING "RPC: call_verify: program %u, version %u unsupported by server %s\n",
1051                                 (unsigned int)task->tk_client->cl_prog,
1052                                 (unsigned int)task->tk_client->cl_vers,
1053                                 task->tk_client->cl_server);
1054                 goto out_eio;
1055         case RPC_PROC_UNAVAIL:
1056                 printk(KERN_WARNING "RPC: call_verify: proc %p unsupported by program %u, version %u on server %s\n",
1057                                 task->tk_msg.rpc_proc,
1058                                 task->tk_client->cl_prog,
1059                                 task->tk_client->cl_vers,
1060                                 task->tk_client->cl_server);
1061                 goto out_eio;
1062         case RPC_GARBAGE_ARGS:
1063                 break;                  /* retry */
1064         default:
1065                 printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1066                 /* Also retry */
1067         }
1068
1069 garbage:
1070         dprintk("RPC: %4d call_verify: server saw garbage\n", task->tk_pid);
1071         task->tk_client->cl_stats->rpcgarbage++;
1072         if (task->tk_garb_retry) {
1073                 task->tk_garb_retry--;
1074                 dprintk(KERN_WARNING "RPC: garbage, retrying %4d\n", task->tk_pid);
1075                 task->tk_action = call_bind;
1076                 return NULL;
1077         }
1078         printk(KERN_WARNING "RPC: garbage, exit EIO\n");
1079 out_eio:
1080         rpc_exit(task, -EIO);
1081         return NULL;
1082 }