patch-2_6_7-vs1_9_1_12
[linux-2.6.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/rpcclnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23
24 #include <asm/system.h>
25
26 #include <linux/types.h>
27 #include <linux/mm.h>
28 #include <linux/slab.h>
29 #include <linux/in.h>
30 #include <linux/utsname.h>
31
32 #include <linux/sunrpc/clnt.h>
33 #include <linux/workqueue.h>
34 #include <linux/sunrpc/rpc_pipe_fs.h>
35
36 #include <linux/nfs.h>
37
38
39 #define RPC_SLACK_SPACE         (1024)  /* total overkill */
40
41 #ifdef RPC_DEBUG
42 # define RPCDBG_FACILITY        RPCDBG_CALL
43 #endif
44
45 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
46
47
48 static void     call_start(struct rpc_task *task);
49 static void     call_reserve(struct rpc_task *task);
50 static void     call_reserveresult(struct rpc_task *task);
51 static void     call_allocate(struct rpc_task *task);
52 static void     call_encode(struct rpc_task *task);
53 static void     call_decode(struct rpc_task *task);
54 static void     call_bind(struct rpc_task *task);
55 static void     call_transmit(struct rpc_task *task);
56 static void     call_status(struct rpc_task *task);
57 static void     call_refresh(struct rpc_task *task);
58 static void     call_refreshresult(struct rpc_task *task);
59 static void     call_timeout(struct rpc_task *task);
60 static void     call_connect(struct rpc_task *task);
61 static void     call_connect_status(struct rpc_task *task);
62 static u32 *    call_header(struct rpc_task *task);
63 static u32 *    call_verify(struct rpc_task *task);
64
65
66 static int
67 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
68 {
69         static uint32_t clntid;
70         int error;
71
72         if (dir_name == NULL)
73                 return 0;
74         for (;;) {
75                 snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
76                                 "%s/clnt%x", dir_name,
77                                 (unsigned int)clntid++);
78                 clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
79                 clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
80                 if (!IS_ERR(clnt->cl_dentry))
81                         return 0;
82                 error = PTR_ERR(clnt->cl_dentry);
83                 if (error != -EEXIST) {
84                         printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
85                                         clnt->cl_pathname, error);
86                         return error;
87                 }
88         }
89 }
90
91 /*
92  * Create an RPC client
93  * FIXME: This should also take a flags argument (as in task->tk_flags).
94  * It's called (among others) from pmap_create_client, which may in
95  * turn be called by an async task. In this case, rpciod should not be
96  * made to sleep too long.
97  */
98 struct rpc_clnt *
99 rpc_create_client(struct rpc_xprt *xprt, char *servname,
100                   struct rpc_program *program, u32 vers,
101                   rpc_authflavor_t flavor)
102 {
103         struct rpc_version      *version;
104         struct rpc_clnt         *clnt = NULL;
105         int err;
106         int len;
107
108         dprintk("RPC: creating %s client for %s (xprt %p)\n",
109                 program->name, servname, xprt);
110
111         err = -EINVAL;
112         if (!xprt)
113                 goto out_err;
114         if (vers >= program->nrvers || !(version = program->version[vers]))
115                 goto out_err;
116
117         err = -ENOMEM;
118         clnt = (struct rpc_clnt *) kmalloc(sizeof(*clnt), GFP_KERNEL);
119         if (!clnt)
120                 goto out_err;
121         memset(clnt, 0, sizeof(*clnt));
122         atomic_set(&clnt->cl_users, 0);
123         atomic_set(&clnt->cl_count, 1);
124         clnt->cl_parent = clnt;
125
126         clnt->cl_server = clnt->cl_inline_name;
127         len = strlen(servname) + 1;
128         if (len > sizeof(clnt->cl_inline_name)) {
129                 char *buf = kmalloc(len, GFP_KERNEL);
130                 if (buf != 0)
131                         clnt->cl_server = buf;
132                 else
133                         len = sizeof(clnt->cl_inline_name);
134         }
135         strlcpy(clnt->cl_server, servname, len);
136
137         clnt->cl_xprt     = xprt;
138         clnt->cl_procinfo = version->procs;
139         clnt->cl_maxproc  = version->nrprocs;
140         clnt->cl_protname = program->name;
141         clnt->cl_pmap     = &clnt->cl_pmap_default;
142         clnt->cl_port     = xprt->addr.sin_port;
143         clnt->cl_prog     = program->number;
144         clnt->cl_vers     = version->number;
145         clnt->cl_prot     = xprt->prot;
146         clnt->cl_stats    = program->stats;
147         rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
148
149         if (!clnt->cl_port)
150                 clnt->cl_autobind = 1;
151
152         clnt->cl_rtt = &clnt->cl_rtt_default;
153         rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
154
155         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
156         if (err < 0)
157                 goto out_no_path;
158
159         err = -ENOMEM;
160         if (!rpcauth_create(flavor, clnt)) {
161                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
162                                 flavor);
163                 goto out_no_auth;
164         }
165
166         /* save the nodename */
167         clnt->cl_nodelen = strlen(system_utsname.nodename);
168         if (clnt->cl_nodelen > UNX_MAXNODENAME)
169                 clnt->cl_nodelen = UNX_MAXNODENAME;
170         memcpy(clnt->cl_nodename, system_utsname.nodename, clnt->cl_nodelen);
171         return clnt;
172
173 out_no_auth:
174         rpc_rmdir(clnt->cl_pathname);
175 out_no_path:
176         if (clnt->cl_server != clnt->cl_inline_name)
177                 kfree(clnt->cl_server);
178         kfree(clnt);
179 out_err:
180         return ERR_PTR(err);
181 }
182
183 /*
184  * This function clones the RPC client structure. It allows us to share the
185  * same transport while varying parameters such as the authentication
186  * flavour.
187  */
188 struct rpc_clnt *
189 rpc_clone_client(struct rpc_clnt *clnt)
190 {
191         struct rpc_clnt *new;
192
193         new = (struct rpc_clnt *)kmalloc(sizeof(*new), GFP_KERNEL);
194         if (!new)
195                 goto out_no_clnt;
196         memcpy(new, clnt, sizeof(*new));
197         atomic_set(&new->cl_count, 1);
198         atomic_set(&new->cl_users, 0);
199         atomic_inc(&new->cl_parent->cl_count);
200         if (new->cl_auth)
201                 atomic_inc(&new->cl_auth->au_count);
202         return new;
203 out_no_clnt:
204         printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
205         return ERR_PTR(-ENOMEM);
206 }
207
208 /*
209  * Properly shut down an RPC client, terminating all outstanding
210  * requests. Note that we must be certain that cl_oneshot and
211  * cl_dead are cleared, or else the client would be destroyed
212  * when the last task releases it.
213  */
214 int
215 rpc_shutdown_client(struct rpc_clnt *clnt)
216 {
217         dprintk("RPC: shutting down %s client for %s, tasks=%d\n",
218                         clnt->cl_protname, clnt->cl_server,
219                         atomic_read(&clnt->cl_users));
220
221         while (atomic_read(&clnt->cl_users) > 0) {
222                 /* Don't let rpc_release_client destroy us */
223                 clnt->cl_oneshot = 0;
224                 clnt->cl_dead = 0;
225                 rpc_killall_tasks(clnt);
226                 sleep_on_timeout(&destroy_wait, 1*HZ);
227         }
228
229         if (atomic_read(&clnt->cl_users) < 0) {
230                 printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n",
231                                 clnt, atomic_read(&clnt->cl_users));
232 #ifdef RPC_DEBUG
233                 rpc_show_tasks();
234 #endif
235                 BUG();
236         }
237
238         return rpc_destroy_client(clnt);
239 }
240
241 /*
242  * Delete an RPC client
243  */
244 int
245 rpc_destroy_client(struct rpc_clnt *clnt)
246 {
247         if (!atomic_dec_and_test(&clnt->cl_count))
248                 return 1;
249         BUG_ON(atomic_read(&clnt->cl_users) != 0);
250
251         dprintk("RPC: destroying %s client for %s\n",
252                         clnt->cl_protname, clnt->cl_server);
253         if (clnt->cl_auth) {
254                 rpcauth_destroy(clnt->cl_auth);
255                 clnt->cl_auth = NULL;
256         }
257         if (clnt->cl_parent != clnt) {
258                 rpc_destroy_client(clnt->cl_parent);
259                 goto out_free;
260         }
261         if (clnt->cl_pathname[0])
262                 rpc_rmdir(clnt->cl_pathname);
263         if (clnt->cl_xprt) {
264                 xprt_destroy(clnt->cl_xprt);
265                 clnt->cl_xprt = NULL;
266         }
267         if (clnt->cl_server != clnt->cl_inline_name)
268                 kfree(clnt->cl_server);
269 out_free:
270         kfree(clnt);
271         return 0;
272 }
273
274 /*
275  * Release an RPC client
276  */
277 void
278 rpc_release_client(struct rpc_clnt *clnt)
279 {
280         dprintk("RPC:      rpc_release_client(%p, %d)\n",
281                                 clnt, atomic_read(&clnt->cl_users));
282
283         if (!atomic_dec_and_test(&clnt->cl_users))
284                 return;
285         wake_up(&destroy_wait);
286         if (clnt->cl_oneshot || clnt->cl_dead)
287                 rpc_destroy_client(clnt);
288 }
289
290 /*
291  * Default callback for async RPC calls
292  */
293 static void
294 rpc_default_callback(struct rpc_task *task)
295 {
296 }
297
298 /*
299  *      Export the signal mask handling for aysnchronous code that
300  *      sleeps on RPC calls
301  */
302  
303 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
304 {
305         unsigned long   sigallow = sigmask(SIGKILL);
306         unsigned long   irqflags;
307         
308         /* Turn off various signals */
309         if (clnt->cl_intr) {
310                 struct k_sigaction *action = current->sighand->action;
311                 if (action[SIGINT-1].sa.sa_handler == SIG_DFL)
312                         sigallow |= sigmask(SIGINT);
313                 if (action[SIGQUIT-1].sa.sa_handler == SIG_DFL)
314                         sigallow |= sigmask(SIGQUIT);
315         }
316         spin_lock_irqsave(&current->sighand->siglock, irqflags);
317         *oldset = current->blocked;
318         siginitsetinv(&current->blocked, sigallow & ~oldset->sig[0]);
319         recalc_sigpending();
320         spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
321 }
322
323 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
324 {
325         unsigned long   irqflags;
326         
327         spin_lock_irqsave(&current->sighand->siglock, irqflags);
328         current->blocked = *oldset;
329         recalc_sigpending();
330         spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
331 }
332
333 /*
334  * New rpc_call implementation
335  */
336 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
337 {
338         struct rpc_task my_task, *task = &my_task;
339         sigset_t        oldset;
340         int             status;
341
342         /* If this client is slain all further I/O fails */
343         if (clnt->cl_dead) 
344                 return -EIO;
345
346         if (flags & RPC_TASK_ASYNC) {
347                 printk("rpc_call_sync: Illegal flag combination for synchronous task\n");
348                 flags &= ~RPC_TASK_ASYNC;
349         }
350
351         rpc_clnt_sigmask(clnt, &oldset);                
352
353         /* Create/initialize a new RPC task */
354         rpc_init_task(task, clnt, NULL, flags);
355         rpc_call_setup(task, msg, 0);
356
357         /* Set up the call info struct and execute the task */
358         if (task->tk_status == 0)
359                 status = rpc_execute(task);
360         else {
361                 status = task->tk_status;
362                 rpc_release_task(task);
363         }
364
365         rpc_clnt_sigunmask(clnt, &oldset);              
366
367         return status;
368 }
369
370 /*
371  * New rpc_call implementation
372  */
373 int
374 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
375                rpc_action callback, void *data)
376 {
377         struct rpc_task *task;
378         sigset_t        oldset;
379         int             status;
380
381         /* If this client is slain all further I/O fails */
382         if (clnt->cl_dead) 
383                 return -EIO;
384
385         flags |= RPC_TASK_ASYNC;
386
387         rpc_clnt_sigmask(clnt, &oldset);                
388
389         /* Create/initialize a new RPC task */
390         if (!callback)
391                 callback = rpc_default_callback;
392         status = -ENOMEM;
393         if (!(task = rpc_new_task(clnt, callback, flags)))
394                 goto out;
395         task->tk_calldata = data;
396
397         rpc_call_setup(task, msg, 0);
398
399         /* Set up the call info struct and execute the task */
400         if (task->tk_status == 0)
401                 status = rpc_execute(task);
402         else {
403                 status = task->tk_status;
404                 rpc_release_task(task);
405         }
406
407 out:
408         rpc_clnt_sigunmask(clnt, &oldset);              
409
410         return status;
411 }
412
413
414 void
415 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
416 {
417         task->tk_msg   = *msg;
418         task->tk_flags |= flags;
419         /* Bind the user cred */
420         if (task->tk_msg.rpc_cred != NULL) {
421                 rpcauth_holdcred(task);
422         } else
423                 rpcauth_bindcred(task);
424
425         if (task->tk_status == 0)
426                 task->tk_action = call_start;
427         else
428                 task->tk_action = NULL;
429 }
430
431 void
432 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
433 {
434         struct rpc_xprt *xprt = clnt->cl_xprt;
435
436         xprt->sndsize = 0;
437         if (sndsize)
438                 xprt->sndsize = sndsize + RPC_SLACK_SPACE;
439         xprt->rcvsize = 0;
440         if (rcvsize)
441                 xprt->rcvsize = rcvsize + RPC_SLACK_SPACE;
442         if (xprt_connected(xprt))
443                 xprt_sock_setbufsize(xprt);
444 }
445
446 /*
447  * Restart an (async) RPC call. Usually called from within the
448  * exit handler.
449  */
450 void
451 rpc_restart_call(struct rpc_task *task)
452 {
453         if (RPC_ASSASSINATED(task))
454                 return;
455
456         task->tk_action = call_start;
457 }
458
459 /*
460  * 0.  Initial state
461  *
462  *     Other FSM states can be visited zero or more times, but
463  *     this state is visited exactly once for each RPC.
464  */
465 static void
466 call_start(struct rpc_task *task)
467 {
468         struct rpc_clnt *clnt = task->tk_client;
469
470         dprintk("RPC: %4d call_start %s%d proc %d (%s)\n", task->tk_pid,
471                 clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc->p_proc,
472                 (RPC_IS_ASYNC(task) ? "async" : "sync"));
473
474         /* Increment call count */
475         task->tk_msg.rpc_proc->p_count++;
476         clnt->cl_stats->rpccnt++;
477         task->tk_action = call_reserve;
478 }
479
480 /*
481  * 1.   Reserve an RPC call slot
482  */
483 static void
484 call_reserve(struct rpc_task *task)
485 {
486         dprintk("RPC: %4d call_reserve\n", task->tk_pid);
487
488         if (!rpcauth_uptodatecred(task)) {
489                 task->tk_action = call_refresh;
490                 return;
491         }
492
493         task->tk_status  = 0;
494         task->tk_action  = call_reserveresult;
495         xprt_reserve(task);
496 }
497
498 /*
499  * 1b.  Grok the result of xprt_reserve()
500  */
501 static void
502 call_reserveresult(struct rpc_task *task)
503 {
504         int status = task->tk_status;
505
506         dprintk("RPC: %4d call_reserveresult (status %d)\n",
507                                 task->tk_pid, task->tk_status);
508
509         /*
510          * After a call to xprt_reserve(), we must have either
511          * a request slot or else an error status.
512          */
513         task->tk_status = 0;
514         if (status >= 0) {
515                 if (task->tk_rqstp) {
516                         task->tk_action = call_allocate;
517                         return;
518                 }
519
520                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
521                                 __FUNCTION__, status);
522                 rpc_exit(task, -EIO);
523                 return;
524         }
525
526         /*
527          * Even though there was an error, we may have acquired
528          * a request slot somehow.  Make sure not to leak it.
529          */
530         if (task->tk_rqstp) {
531                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
532                                 __FUNCTION__, status);
533                 xprt_release(task);
534         }
535
536         switch (status) {
537         case -EAGAIN:   /* woken up; retry */
538                 task->tk_action = call_reserve;
539                 return;
540         case -EIO:      /* probably a shutdown */
541                 break;
542         default:
543                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
544                                 __FUNCTION__, status);
545                 break;
546         }
547         rpc_exit(task, status);
548 }
549
550 /*
551  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
552  *      (Note: buffer memory is freed in rpc_task_release).
553  */
554 static void
555 call_allocate(struct rpc_task *task)
556 {
557         unsigned int    bufsiz;
558
559         dprintk("RPC: %4d call_allocate (status %d)\n", 
560                                 task->tk_pid, task->tk_status);
561         task->tk_action = call_bind;
562         if (task->tk_buffer)
563                 return;
564
565         /* FIXME: compute buffer requirements more exactly using
566          * auth->au_wslack */
567         bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
568
569         if (rpc_malloc(task, bufsiz << 1) != NULL)
570                 return;
571         printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); 
572
573         if (RPC_IS_ASYNC(task) || !(task->tk_client->cl_intr && signalled())) {
574                 xprt_release(task);
575                 task->tk_action = call_reserve;
576                 rpc_delay(task, HZ>>4);
577                 return;
578         }
579
580         rpc_exit(task, -ERESTARTSYS);
581 }
582
583 /*
584  * 3.   Encode arguments of an RPC call
585  */
586 static void
587 call_encode(struct rpc_task *task)
588 {
589         struct rpc_clnt *clnt = task->tk_client;
590         struct rpc_rqst *req = task->tk_rqstp;
591         struct xdr_buf *sndbuf = &req->rq_snd_buf;
592         struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
593         unsigned int    bufsiz;
594         kxdrproc_t      encode;
595         int             status;
596         u32             *p;
597
598         dprintk("RPC: %4d call_encode (status %d)\n", 
599                                 task->tk_pid, task->tk_status);
600
601         /* Default buffer setup */
602         bufsiz = task->tk_bufsize >> 1;
603         sndbuf->head[0].iov_base = (void *)task->tk_buffer;
604         sndbuf->head[0].iov_len  = bufsiz;
605         sndbuf->tail[0].iov_len  = 0;
606         sndbuf->page_len         = 0;
607         sndbuf->len              = 0;
608         sndbuf->buflen           = bufsiz;
609         rcvbuf->head[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz);
610         rcvbuf->head[0].iov_len  = bufsiz;
611         rcvbuf->tail[0].iov_len  = 0;
612         rcvbuf->page_len         = 0;
613         rcvbuf->len              = 0;
614         rcvbuf->buflen           = bufsiz;
615
616         /* Encode header and provided arguments */
617         encode = task->tk_msg.rpc_proc->p_encode;
618         if (!(p = call_header(task))) {
619                 printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
620                 rpc_exit(task, -EIO);
621                 return;
622         }
623         if (encode && (status = rpcauth_wrap_req(task, encode, req, p,
624                                                  task->tk_msg.rpc_argp)) < 0) {
625                 printk(KERN_WARNING "%s: can't encode arguments: %d\n",
626                                 clnt->cl_protname, -status);
627                 rpc_exit(task, status);
628         }
629 }
630
631 /*
632  * 4.   Get the server port number if not yet set
633  */
634 static void
635 call_bind(struct rpc_task *task)
636 {
637         struct rpc_clnt *clnt = task->tk_client;
638         struct rpc_xprt *xprt = clnt->cl_xprt;
639
640         dprintk("RPC: %4d call_bind xprt %p %s connected\n", task->tk_pid,
641                         xprt, (xprt_connected(xprt) ? "is" : "is not"));
642
643         task->tk_action = (xprt_connected(xprt)) ? call_transmit : call_connect;
644
645         if (!clnt->cl_port) {
646                 task->tk_action = call_connect;
647                 task->tk_timeout = RPC_CONNECT_TIMEOUT;
648                 rpc_getport(task, clnt);
649         }
650 }
651
652 /*
653  * 4a.  Connect to the RPC server (TCP case)
654  */
655 static void
656 call_connect(struct rpc_task *task)
657 {
658         struct rpc_clnt *clnt = task->tk_client;
659
660         dprintk("RPC: %4d call_connect status %d\n",
661                                 task->tk_pid, task->tk_status);
662
663         if (xprt_connected(clnt->cl_xprt)) {
664                 task->tk_action = call_transmit;
665                 return;
666         }
667         task->tk_action = call_connect_status;
668         if (task->tk_status < 0)
669                 return;
670         xprt_connect(task);
671 }
672
673 /*
674  * 4b. Sort out connect result
675  */
676 static void
677 call_connect_status(struct rpc_task *task)
678 {
679         struct rpc_clnt *clnt = task->tk_client;
680         int status = task->tk_status;
681
682         task->tk_status = 0;
683         if (status >= 0) {
684                 clnt->cl_stats->netreconn++;
685                 task->tk_action = call_transmit;
686                 return;
687         }
688
689         /* Something failed: we may have to rebind */
690         if (clnt->cl_autobind)
691                 clnt->cl_port = 0;
692         switch (status) {
693         case -ENOTCONN:
694         case -ETIMEDOUT:
695         case -EAGAIN:
696                 task->tk_action = (clnt->cl_port == 0) ? call_bind : call_connect;
697                 break;
698         default:
699                 rpc_exit(task, -EIO);
700         }
701 }
702
703 /*
704  * 5.   Transmit the RPC request, and wait for reply
705  */
706 static void
707 call_transmit(struct rpc_task *task)
708 {
709         dprintk("RPC: %4d call_transmit (status %d)\n", 
710                                 task->tk_pid, task->tk_status);
711
712         task->tk_action = call_status;
713         if (task->tk_status < 0)
714                 return;
715         task->tk_status = xprt_prepare_transmit(task);
716         if (task->tk_status != 0)
717                 return;
718         /* Encode here so that rpcsec_gss can use correct sequence number. */
719         if (!task->tk_rqstp->rq_bytes_sent)
720                 call_encode(task);
721         if (task->tk_status < 0)
722                 return;
723         xprt_transmit(task);
724         if (task->tk_status < 0)
725                 return;
726         if (!task->tk_msg.rpc_proc->p_decode) {
727                 task->tk_action = NULL;
728                 rpc_wake_up_task(task);
729         }
730 }
731
732 /*
733  * 6.   Sort out the RPC call status
734  */
735 static void
736 call_status(struct rpc_task *task)
737 {
738         struct rpc_clnt *clnt = task->tk_client;
739         struct rpc_rqst *req = task->tk_rqstp;
740         int             status;
741
742         if (req->rq_received > 0 && !req->rq_bytes_sent)
743                 task->tk_status = req->rq_received;
744
745         dprintk("RPC: %4d call_status (status %d)\n", 
746                                 task->tk_pid, task->tk_status);
747
748         status = task->tk_status;
749         if (status >= 0) {
750                 task->tk_action = call_decode;
751                 return;
752         }
753
754         task->tk_status = 0;
755         switch(status) {
756         case -ETIMEDOUT:
757                 task->tk_action = call_timeout;
758                 break;
759         case -ECONNREFUSED:
760         case -ENOTCONN:
761                 req->rq_bytes_sent = 0;
762                 if (clnt->cl_autobind)
763                         clnt->cl_port = 0;
764                 task->tk_action = call_bind;
765                 break;
766         case -EAGAIN:
767                 task->tk_action = call_transmit;
768                 break;
769         case -EIO:
770                 /* shutdown or soft timeout */
771                 rpc_exit(task, status);
772                 break;
773         default:
774                 if (clnt->cl_chatty)
775                         printk("%s: RPC call returned error %d\n",
776                                clnt->cl_protname, -status);
777                 rpc_exit(task, status);
778                 break;
779         }
780 }
781
782 /*
783  * 6a.  Handle RPC timeout
784  *      We do not release the request slot, so we keep using the
785  *      same XID for all retransmits.
786  */
787 static void
788 call_timeout(struct rpc_task *task)
789 {
790         struct rpc_clnt *clnt = task->tk_client;
791
792         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
793                 dprintk("RPC: %4d call_timeout (minor)\n", task->tk_pid);
794                 goto retry;
795         }
796
797         dprintk("RPC: %4d call_timeout (major)\n", task->tk_pid);
798         if (RPC_IS_SOFT(task)) {
799                 if (clnt->cl_chatty)
800                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
801                                 clnt->cl_protname, clnt->cl_server);
802                 rpc_exit(task, -EIO);
803                 return;
804         }
805
806         if (clnt->cl_chatty && !(task->tk_flags & RPC_CALL_MAJORSEEN)) {
807                 task->tk_flags |= RPC_CALL_MAJORSEEN;
808                 printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
809                         clnt->cl_protname, clnt->cl_server);
810         }
811         if (clnt->cl_autobind)
812                 clnt->cl_port = 0;
813
814 retry:
815         clnt->cl_stats->rpcretrans++;
816         task->tk_action = call_bind;
817         task->tk_status = 0;
818 }
819
820 /*
821  * 7.   Decode the RPC reply
822  */
823 static void
824 call_decode(struct rpc_task *task)
825 {
826         struct rpc_clnt *clnt = task->tk_client;
827         struct rpc_rqst *req = task->tk_rqstp;
828         kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
829         u32             *p;
830
831         dprintk("RPC: %4d call_decode (status %d)\n", 
832                                 task->tk_pid, task->tk_status);
833
834         if (clnt->cl_chatty && (task->tk_flags & RPC_CALL_MAJORSEEN)) {
835                 printk(KERN_NOTICE "%s: server %s OK\n",
836                         clnt->cl_protname, clnt->cl_server);
837                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
838         }
839
840         if (task->tk_status < 12) {
841                 if (!RPC_IS_SOFT(task)) {
842                         task->tk_action = call_bind;
843                         clnt->cl_stats->rpcretrans++;
844                         goto out_retry;
845                 }
846                 printk(KERN_WARNING "%s: too small RPC reply size (%d bytes)\n",
847                         clnt->cl_protname, task->tk_status);
848                 rpc_exit(task, -EIO);
849                 return;
850         }
851
852         req->rq_rcv_buf.len = req->rq_private_buf.len;
853
854         /* Check that the softirq receive buffer is valid */
855         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
856                                 sizeof(req->rq_rcv_buf)) != 0);
857
858         /* Verify the RPC header */
859         if (!(p = call_verify(task))) {
860                 if (task->tk_action == NULL)
861                         return;
862                 goto out_retry;
863         }
864
865         /*
866          * The following is an NFS-specific hack to cater for setuid
867          * processes whose uid is mapped to nobody on the server.
868          */
869         if (task->tk_client->cl_droppriv && 
870             (ntohl(*p) == NFSERR_ACCES || ntohl(*p) == NFSERR_PERM)) {
871                 if (RPC_IS_SETUID(task) && task->tk_suid_retry) {
872                         dprintk("RPC: %4d retry squashed uid\n", task->tk_pid);
873                         task->tk_flags ^= RPC_CALL_REALUID;
874                         task->tk_action = call_bind;
875                         task->tk_suid_retry--;
876                         goto out_retry;
877                 }
878         }
879
880         task->tk_action = NULL;
881
882         if (decode)
883                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
884                                                       task->tk_msg.rpc_resp);
885         dprintk("RPC: %4d call_decode result %d\n", task->tk_pid,
886                                         task->tk_status);
887         return;
888 out_retry:
889         req->rq_received = req->rq_private_buf.len = 0;
890         task->tk_status = 0;
891 }
892
893 /*
894  * 8.   Refresh the credentials if rejected by the server
895  */
896 static void
897 call_refresh(struct rpc_task *task)
898 {
899         dprintk("RPC: %4d call_refresh\n", task->tk_pid);
900
901         xprt_release(task);     /* Must do to obtain new XID */
902         task->tk_action = call_refreshresult;
903         task->tk_status = 0;
904         task->tk_client->cl_stats->rpcauthrefresh++;
905         rpcauth_refreshcred(task);
906 }
907
908 /*
909  * 8a.  Process the results of a credential refresh
910  */
911 static void
912 call_refreshresult(struct rpc_task *task)
913 {
914         int status = task->tk_status;
915         dprintk("RPC: %4d call_refreshresult (status %d)\n", 
916                                 task->tk_pid, task->tk_status);
917
918         task->tk_status = 0;
919         task->tk_action = call_reserve;
920         if (status >= 0 && rpcauth_uptodatecred(task))
921                 return;
922         if (rpcauth_deadcred(task)) {
923                 rpc_exit(task, -EACCES);
924                 return;
925         }
926         task->tk_action = call_refresh;
927         if (status != -ETIMEDOUT)
928                 rpc_delay(task, 3*HZ);
929         return;
930 }
931
932 /*
933  * Call header serialization
934  */
935 static u32 *
936 call_header(struct rpc_task *task)
937 {
938         struct rpc_clnt *clnt = task->tk_client;
939         struct rpc_xprt *xprt = clnt->cl_xprt;
940         struct rpc_rqst *req = task->tk_rqstp;
941         u32             *p = req->rq_svec[0].iov_base;
942
943         /* FIXME: check buffer size? */
944         if (xprt->stream)
945                 *p++ = 0;               /* fill in later */
946         *p++ = req->rq_xid;             /* XID */
947         *p++ = htonl(RPC_CALL);         /* CALL */
948         *p++ = htonl(RPC_VERSION);      /* RPC version */
949         *p++ = htonl(clnt->cl_prog);    /* program number */
950         *p++ = htonl(clnt->cl_vers);    /* program version */
951         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
952         return rpcauth_marshcred(task, p);
953 }
954
955 /*
956  * Reply header verification
957  */
958 static u32 *
959 call_verify(struct rpc_task *task)
960 {
961         u32     *p = task->tk_rqstp->rq_rcv_buf.head[0].iov_base, n;
962
963         p += 1; /* skip XID */
964
965         if ((n = ntohl(*p++)) != RPC_REPLY) {
966                 printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
967                 goto garbage;
968         }
969         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
970                 int     error = -EACCES;
971
972                 if ((n = ntohl(*p++)) != RPC_AUTH_ERROR) {
973                         printk(KERN_WARNING "call_verify: RPC call rejected: %x\n", n);
974                 } else
975                 switch ((n = ntohl(*p++))) {
976                 case RPC_AUTH_REJECTEDCRED:
977                 case RPC_AUTH_REJECTEDVERF:
978                 case RPCSEC_GSS_CREDPROBLEM:
979                 case RPCSEC_GSS_CTXPROBLEM:
980                         if (!task->tk_cred_retry)
981                                 break;
982                         task->tk_cred_retry--;
983                         dprintk("RPC: %4d call_verify: retry stale creds\n",
984                                                         task->tk_pid);
985                         rpcauth_invalcred(task);
986                         task->tk_action = call_refresh;
987                         return NULL;
988                 case RPC_AUTH_BADCRED:
989                 case RPC_AUTH_BADVERF:
990                         /* possibly garbled cred/verf? */
991                         if (!task->tk_garb_retry)
992                                 break;
993                         task->tk_garb_retry--;
994                         dprintk("RPC: %4d call_verify: retry garbled creds\n",
995                                                         task->tk_pid);
996                         task->tk_action = call_bind;
997                         return NULL;
998                 case RPC_AUTH_TOOWEAK:
999                         printk(KERN_NOTICE "call_verify: server requires stronger "
1000                                "authentication.\n");
1001                         break;
1002                 default:
1003                         printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1004                         error = -EIO;
1005                 }
1006                 dprintk("RPC: %4d call_verify: call rejected %d\n",
1007                                                 task->tk_pid, n);
1008                 rpc_exit(task, error);
1009                 return NULL;
1010         }
1011         if (!(p = rpcauth_checkverf(task, p))) {
1012                 printk(KERN_WARNING "call_verify: auth check failed\n");
1013                 goto garbage;           /* bad verifier, retry */
1014         }
1015         switch ((n = ntohl(*p++))) {
1016         case RPC_SUCCESS:
1017                 return p;
1018         case RPC_PROG_UNAVAIL:
1019                 printk(KERN_WARNING "RPC: call_verify: program %u is unsupported by server %s\n",
1020                                 (unsigned int)task->tk_client->cl_prog,
1021                                 task->tk_client->cl_server);
1022                 goto out_eio;
1023         case RPC_PROG_MISMATCH:
1024                 printk(KERN_WARNING "RPC: call_verify: program %u, version %u unsupported by server %s\n",
1025                                 (unsigned int)task->tk_client->cl_prog,
1026                                 (unsigned int)task->tk_client->cl_vers,
1027                                 task->tk_client->cl_server);
1028                 goto out_eio;
1029         case RPC_PROC_UNAVAIL:
1030                 printk(KERN_WARNING "RPC: call_verify: proc %p unsupported by program %u, version %u on server %s\n",
1031                                 task->tk_msg.rpc_proc,
1032                                 task->tk_client->cl_prog,
1033                                 task->tk_client->cl_vers,
1034                                 task->tk_client->cl_server);
1035                 goto out_eio;
1036         case RPC_GARBAGE_ARGS:
1037                 break;                  /* retry */
1038         default:
1039                 printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1040                 /* Also retry */
1041         }
1042
1043 garbage:
1044         dprintk("RPC: %4d call_verify: server saw garbage\n", task->tk_pid);
1045         task->tk_client->cl_stats->rpcgarbage++;
1046         if (task->tk_garb_retry) {
1047                 task->tk_garb_retry--;
1048                 dprintk(KERN_WARNING "RPC: garbage, retrying %4d\n", task->tk_pid);
1049                 task->tk_action = call_bind;
1050                 return NULL;
1051         }
1052         printk(KERN_WARNING "RPC: garbage, exit EIO\n");
1053 out_eio:
1054         rpc_exit(task, -EIO);
1055         return NULL;
1056 }