441c5de1bbb8e3076d581c19d80089cf1a0345cd
[linux-2.6.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/rpcclnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23
24 #include <asm/system.h>
25
26 #include <linux/types.h>
27 #include <linux/mm.h>
28 #include <linux/slab.h>
29 #include <linux/in.h>
30 #include <linux/utsname.h>
31
32 #include <linux/sunrpc/clnt.h>
33 #include <linux/workqueue.h>
34 #include <linux/sunrpc/rpc_pipe_fs.h>
35
36 #include <linux/nfs.h>
37
38
39 #define RPC_SLACK_SPACE         (1024)  /* total overkill */
40
41 #ifdef RPC_DEBUG
42 # define RPCDBG_FACILITY        RPCDBG_CALL
43 #endif
44
45 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
46
47
48 static void     call_start(struct rpc_task *task);
49 static void     call_reserve(struct rpc_task *task);
50 static void     call_reserveresult(struct rpc_task *task);
51 static void     call_allocate(struct rpc_task *task);
52 static void     call_encode(struct rpc_task *task);
53 static void     call_decode(struct rpc_task *task);
54 static void     call_bind(struct rpc_task *task);
55 static void     call_transmit(struct rpc_task *task);
56 static void     call_status(struct rpc_task *task);
57 static void     call_refresh(struct rpc_task *task);
58 static void     call_refreshresult(struct rpc_task *task);
59 static void     call_timeout(struct rpc_task *task);
60 static void     call_connect(struct rpc_task *task);
61 static void     call_connect_status(struct rpc_task *task);
62 static u32 *    call_header(struct rpc_task *task);
63 static u32 *    call_verify(struct rpc_task *task);
64
65
66 static int
67 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
68 {
69         static uint32_t clntid;
70         int error;
71
72         if (dir_name == NULL)
73                 return 0;
74         for (;;) {
75                 snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
76                                 "%s/clnt%x", dir_name,
77                                 (unsigned int)clntid++);
78                 clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
79                 clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
80                 if (!IS_ERR(clnt->cl_dentry))
81                         return 0;
82                 error = PTR_ERR(clnt->cl_dentry);
83                 if (error != -EEXIST) {
84                         printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
85                                         clnt->cl_pathname, error);
86                         return error;
87                 }
88         }
89 }
90
91 /*
92  * Create an RPC client
93  * FIXME: This should also take a flags argument (as in task->tk_flags).
94  * It's called (among others) from pmap_create_client, which may in
95  * turn be called by an async task. In this case, rpciod should not be
96  * made to sleep too long.
97  */
98 struct rpc_clnt *
99 rpc_create_client(struct rpc_xprt *xprt, char *servname,
100                   struct rpc_program *program, u32 vers,
101                   rpc_authflavor_t flavor)
102 {
103         struct rpc_version      *version;
104         struct rpc_clnt         *clnt = NULL;
105         int err;
106         int len;
107
108         dprintk("RPC: creating %s client for %s (xprt %p)\n",
109                 program->name, servname, xprt);
110
111         err = -EINVAL;
112         if (!xprt)
113                 goto out_err;
114         if (vers >= program->nrvers || !(version = program->version[vers]))
115                 goto out_err;
116
117         err = -ENOMEM;
118         clnt = (struct rpc_clnt *) kmalloc(sizeof(*clnt), GFP_KERNEL);
119         if (!clnt)
120                 goto out_err;
121         memset(clnt, 0, sizeof(*clnt));
122         atomic_set(&clnt->cl_users, 0);
123         atomic_set(&clnt->cl_count, 1);
124         clnt->cl_parent = clnt;
125
126         clnt->cl_server = clnt->cl_inline_name;
127         len = strlen(servname) + 1;
128         if (len > sizeof(clnt->cl_inline_name)) {
129                 char *buf = kmalloc(len, GFP_KERNEL);
130                 if (buf != 0)
131                         clnt->cl_server = buf;
132                 else
133                         len = sizeof(clnt->cl_inline_name);
134         }
135         strlcpy(clnt->cl_server, servname, len);
136
137         clnt->cl_xprt     = xprt;
138         clnt->cl_procinfo = version->procs;
139         clnt->cl_maxproc  = version->nrprocs;
140         clnt->cl_protname = program->name;
141         clnt->cl_pmap     = &clnt->cl_pmap_default;
142         clnt->cl_port     = xprt->addr.sin_port;
143         clnt->cl_prog     = program->number;
144         clnt->cl_vers     = version->number;
145         clnt->cl_prot     = xprt->prot;
146         clnt->cl_stats    = program->stats;
147         rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
148
149         if (!clnt->cl_port)
150                 clnt->cl_autobind = 1;
151
152         clnt->cl_rtt = &clnt->cl_rtt_default;
153         rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
154
155         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
156         if (err < 0)
157                 goto out_no_path;
158
159         err = -ENOMEM;
160         if (!rpcauth_create(flavor, clnt)) {
161                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
162                                 flavor);
163                 goto out_no_auth;
164         }
165
166         /* save the nodename */
167         clnt->cl_nodelen = strlen(system_utsname.nodename);
168         if (clnt->cl_nodelen > UNX_MAXNODENAME)
169                 clnt->cl_nodelen = UNX_MAXNODENAME;
170         memcpy(clnt->cl_nodename, system_utsname.nodename, clnt->cl_nodelen);
171         return clnt;
172
173 out_no_auth:
174         rpc_rmdir(clnt->cl_pathname);
175 out_no_path:
176         if (clnt->cl_server != clnt->cl_inline_name)
177                 kfree(clnt->cl_server);
178         kfree(clnt);
179 out_err:
180         return ERR_PTR(err);
181 }
182
183 /*
184  * This function clones the RPC client structure. It allows us to share the
185  * same transport while varying parameters such as the authentication
186  * flavour.
187  */
188 struct rpc_clnt *
189 rpc_clone_client(struct rpc_clnt *clnt)
190 {
191         struct rpc_clnt *new;
192
193         new = (struct rpc_clnt *)kmalloc(sizeof(*new), GFP_KERNEL);
194         if (!new)
195                 goto out_no_clnt;
196         memcpy(new, clnt, sizeof(*new));
197         atomic_set(&new->cl_count, 1);
198         atomic_set(&new->cl_users, 0);
199         atomic_inc(&new->cl_parent->cl_count);
200         if (new->cl_auth)
201                 atomic_inc(&new->cl_auth->au_count);
202         return new;
203 out_no_clnt:
204         printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
205         return ERR_PTR(-ENOMEM);
206 }
207
208 /*
209  * Properly shut down an RPC client, terminating all outstanding
210  * requests. Note that we must be certain that cl_oneshot and
211  * cl_dead are cleared, or else the client would be destroyed
212  * when the last task releases it.
213  */
214 int
215 rpc_shutdown_client(struct rpc_clnt *clnt)
216 {
217         wait_queue_t __wait;
218         init_waitqueue_entry(&__wait, current);
219         dprintk("RPC: shutting down %s client for %s, tasks=%d\n",
220                         clnt->cl_protname, clnt->cl_server,
221                         atomic_read(&clnt->cl_users));
222
223         add_wait_queue(&destroy_wait, &__wait);
224         set_current_state(TASK_UNINTERRUPTIBLE);
225         while (atomic_read(&clnt->cl_users) > 0) {
226                 /* Don't let rpc_release_client destroy us */
227                 clnt->cl_oneshot = 0;
228                 clnt->cl_dead = 0;
229                 rpc_killall_tasks(clnt);
230                 schedule_timeout(1*HZ);
231                 set_current_state(TASK_UNINTERRUPTIBLE);
232         }
233         current->state = TASK_RUNNING;
234         remove_wait_queue(&destroy_wait, &__wait);
235
236         if (atomic_read(&clnt->cl_users) < 0) {
237                 printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n",
238                                 clnt, atomic_read(&clnt->cl_users));
239 #ifdef RPC_DEBUG
240                 rpc_show_tasks();
241 #endif
242                 BUG();
243         }
244
245         return rpc_destroy_client(clnt);
246 }
247
248 /*
249  * Delete an RPC client
250  */
251 int
252 rpc_destroy_client(struct rpc_clnt *clnt)
253 {
254         if (!atomic_dec_and_test(&clnt->cl_count))
255                 return 1;
256         BUG_ON(atomic_read(&clnt->cl_users) != 0);
257
258         dprintk("RPC: destroying %s client for %s\n",
259                         clnt->cl_protname, clnt->cl_server);
260         if (clnt->cl_auth) {
261                 rpcauth_destroy(clnt->cl_auth);
262                 clnt->cl_auth = NULL;
263         }
264         if (clnt->cl_parent != clnt) {
265                 rpc_destroy_client(clnt->cl_parent);
266                 goto out_free;
267         }
268         if (clnt->cl_pathname[0])
269                 rpc_rmdir(clnt->cl_pathname);
270         if (clnt->cl_xprt) {
271                 xprt_destroy(clnt->cl_xprt);
272                 clnt->cl_xprt = NULL;
273         }
274         if (clnt->cl_server != clnt->cl_inline_name)
275                 kfree(clnt->cl_server);
276 out_free:
277         kfree(clnt);
278         return 0;
279 }
280
281 /*
282  * Release an RPC client
283  */
284 void
285 rpc_release_client(struct rpc_clnt *clnt)
286 {
287         dprintk("RPC:      rpc_release_client(%p, %d)\n",
288                                 clnt, atomic_read(&clnt->cl_users));
289
290         if (!atomic_dec_and_test(&clnt->cl_users))
291                 return;
292         wake_up(&destroy_wait);
293         if (clnt->cl_oneshot || clnt->cl_dead)
294                 rpc_destroy_client(clnt);
295 }
296
297 /*
298  * Default callback for async RPC calls
299  */
300 static void
301 rpc_default_callback(struct rpc_task *task)
302 {
303 }
304
305 /*
306  *      Export the signal mask handling for aysnchronous code that
307  *      sleeps on RPC calls
308  */
309  
310 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
311 {
312         unsigned long   sigallow = sigmask(SIGKILL);
313         unsigned long   irqflags;
314         
315         /* Turn off various signals */
316         if (clnt->cl_intr) {
317                 struct k_sigaction *action = current->sighand->action;
318                 if (action[SIGINT-1].sa.sa_handler == SIG_DFL)
319                         sigallow |= sigmask(SIGINT);
320                 if (action[SIGQUIT-1].sa.sa_handler == SIG_DFL)
321                         sigallow |= sigmask(SIGQUIT);
322         }
323         spin_lock_irqsave(&current->sighand->siglock, irqflags);
324         *oldset = current->blocked;
325         siginitsetinv(&current->blocked, sigallow & ~oldset->sig[0]);
326         recalc_sigpending();
327         spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
328 }
329
330 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
331 {
332         unsigned long   irqflags;
333         
334         spin_lock_irqsave(&current->sighand->siglock, irqflags);
335         current->blocked = *oldset;
336         recalc_sigpending();
337         spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
338 }
339
340 /*
341  * New rpc_call implementation
342  */
343 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
344 {
345         struct rpc_task my_task, *task = &my_task;
346         sigset_t        oldset;
347         int             status;
348
349         /* If this client is slain all further I/O fails */
350         if (clnt->cl_dead) 
351                 return -EIO;
352
353         if (flags & RPC_TASK_ASYNC) {
354                 printk("rpc_call_sync: Illegal flag combination for synchronous task\n");
355                 flags &= ~RPC_TASK_ASYNC;
356         }
357
358         rpc_clnt_sigmask(clnt, &oldset);                
359
360         /* Create/initialize a new RPC task */
361         rpc_init_task(task, clnt, NULL, flags);
362         rpc_call_setup(task, msg, 0);
363
364         /* Set up the call info struct and execute the task */
365         if (task->tk_status == 0)
366                 status = rpc_execute(task);
367         else {
368                 status = task->tk_status;
369                 rpc_release_task(task);
370         }
371
372         rpc_clnt_sigunmask(clnt, &oldset);              
373
374         return status;
375 }
376
377 /*
378  * New rpc_call implementation
379  */
380 int
381 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
382                rpc_action callback, void *data)
383 {
384         struct rpc_task *task;
385         sigset_t        oldset;
386         int             status;
387
388         /* If this client is slain all further I/O fails */
389         if (clnt->cl_dead) 
390                 return -EIO;
391
392         flags |= RPC_TASK_ASYNC;
393
394         rpc_clnt_sigmask(clnt, &oldset);                
395
396         /* Create/initialize a new RPC task */
397         if (!callback)
398                 callback = rpc_default_callback;
399         status = -ENOMEM;
400         if (!(task = rpc_new_task(clnt, callback, flags)))
401                 goto out;
402         task->tk_calldata = data;
403
404         rpc_call_setup(task, msg, 0);
405
406         /* Set up the call info struct and execute the task */
407         if (task->tk_status == 0)
408                 status = rpc_execute(task);
409         else {
410                 status = task->tk_status;
411                 rpc_release_task(task);
412         }
413
414 out:
415         rpc_clnt_sigunmask(clnt, &oldset);              
416
417         return status;
418 }
419
420
421 void
422 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
423 {
424         task->tk_msg   = *msg;
425         task->tk_flags |= flags;
426         /* Bind the user cred */
427         if (task->tk_msg.rpc_cred != NULL) {
428                 rpcauth_holdcred(task);
429         } else
430                 rpcauth_bindcred(task);
431
432         if (task->tk_status == 0)
433                 task->tk_action = call_start;
434         else
435                 task->tk_action = NULL;
436 }
437
438 void
439 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
440 {
441         struct rpc_xprt *xprt = clnt->cl_xprt;
442
443         xprt->sndsize = 0;
444         if (sndsize)
445                 xprt->sndsize = sndsize + RPC_SLACK_SPACE;
446         xprt->rcvsize = 0;
447         if (rcvsize)
448                 xprt->rcvsize = rcvsize + RPC_SLACK_SPACE;
449         if (xprt_connected(xprt))
450                 xprt_sock_setbufsize(xprt);
451 }
452
453 /*
454  * Restart an (async) RPC call. Usually called from within the
455  * exit handler.
456  */
457 void
458 rpc_restart_call(struct rpc_task *task)
459 {
460         if (RPC_ASSASSINATED(task))
461                 return;
462
463         task->tk_action = call_start;
464 }
465
466 /*
467  * 0.  Initial state
468  *
469  *     Other FSM states can be visited zero or more times, but
470  *     this state is visited exactly once for each RPC.
471  */
472 static void
473 call_start(struct rpc_task *task)
474 {
475         struct rpc_clnt *clnt = task->tk_client;
476
477         dprintk("RPC: %4d call_start %s%d proc %d (%s)\n", task->tk_pid,
478                 clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc->p_proc,
479                 (RPC_IS_ASYNC(task) ? "async" : "sync"));
480
481         /* Increment call count */
482         task->tk_msg.rpc_proc->p_count++;
483         clnt->cl_stats->rpccnt++;
484         task->tk_action = call_reserve;
485 }
486
487 /*
488  * 1.   Reserve an RPC call slot
489  */
490 static void
491 call_reserve(struct rpc_task *task)
492 {
493         dprintk("RPC: %4d call_reserve\n", task->tk_pid);
494
495         if (!rpcauth_uptodatecred(task)) {
496                 task->tk_action = call_refresh;
497                 return;
498         }
499
500         task->tk_status  = 0;
501         task->tk_action  = call_reserveresult;
502         xprt_reserve(task);
503 }
504
505 /*
506  * 1b.  Grok the result of xprt_reserve()
507  */
508 static void
509 call_reserveresult(struct rpc_task *task)
510 {
511         int status = task->tk_status;
512
513         dprintk("RPC: %4d call_reserveresult (status %d)\n",
514                                 task->tk_pid, task->tk_status);
515
516         /*
517          * After a call to xprt_reserve(), we must have either
518          * a request slot or else an error status.
519          */
520         task->tk_status = 0;
521         if (status >= 0) {
522                 if (task->tk_rqstp) {
523                         task->tk_action = call_allocate;
524                         return;
525                 }
526
527                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
528                                 __FUNCTION__, status);
529                 rpc_exit(task, -EIO);
530                 return;
531         }
532
533         /*
534          * Even though there was an error, we may have acquired
535          * a request slot somehow.  Make sure not to leak it.
536          */
537         if (task->tk_rqstp) {
538                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
539                                 __FUNCTION__, status);
540                 xprt_release(task);
541         }
542
543         switch (status) {
544         case -EAGAIN:   /* woken up; retry */
545                 task->tk_action = call_reserve;
546                 return;
547         case -EIO:      /* probably a shutdown */
548                 break;
549         default:
550                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
551                                 __FUNCTION__, status);
552                 break;
553         }
554         rpc_exit(task, status);
555 }
556
557 /*
558  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
559  *      (Note: buffer memory is freed in rpc_task_release).
560  */
561 static void
562 call_allocate(struct rpc_task *task)
563 {
564         unsigned int    bufsiz;
565
566         dprintk("RPC: %4d call_allocate (status %d)\n", 
567                                 task->tk_pid, task->tk_status);
568         task->tk_action = call_bind;
569         if (task->tk_buffer)
570                 return;
571
572         /* FIXME: compute buffer requirements more exactly using
573          * auth->au_wslack */
574         bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
575
576         if (rpc_malloc(task, bufsiz << 1) != NULL)
577                 return;
578         printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); 
579
580         if (RPC_IS_ASYNC(task) || !(task->tk_client->cl_intr && signalled())) {
581                 xprt_release(task);
582                 task->tk_action = call_reserve;
583                 rpc_delay(task, HZ>>4);
584                 return;
585         }
586
587         rpc_exit(task, -ERESTARTSYS);
588 }
589
590 /*
591  * 3.   Encode arguments of an RPC call
592  */
593 static void
594 call_encode(struct rpc_task *task)
595 {
596         struct rpc_clnt *clnt = task->tk_client;
597         struct rpc_rqst *req = task->tk_rqstp;
598         struct xdr_buf *sndbuf = &req->rq_snd_buf;
599         struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
600         unsigned int    bufsiz;
601         kxdrproc_t      encode;
602         int             status;
603         u32             *p;
604
605         dprintk("RPC: %4d call_encode (status %d)\n", 
606                                 task->tk_pid, task->tk_status);
607
608         /* Default buffer setup */
609         bufsiz = task->tk_bufsize >> 1;
610         sndbuf->head[0].iov_base = (void *)task->tk_buffer;
611         sndbuf->head[0].iov_len  = bufsiz;
612         sndbuf->tail[0].iov_len  = 0;
613         sndbuf->page_len         = 0;
614         sndbuf->len              = 0;
615         sndbuf->buflen           = bufsiz;
616         rcvbuf->head[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz);
617         rcvbuf->head[0].iov_len  = bufsiz;
618         rcvbuf->tail[0].iov_len  = 0;
619         rcvbuf->page_len         = 0;
620         rcvbuf->len              = 0;
621         rcvbuf->buflen           = bufsiz;
622
623         /* Encode header and provided arguments */
624         encode = task->tk_msg.rpc_proc->p_encode;
625         if (!(p = call_header(task))) {
626                 printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
627                 rpc_exit(task, -EIO);
628                 return;
629         }
630         if (encode && (status = rpcauth_wrap_req(task, encode, req, p,
631                                                  task->tk_msg.rpc_argp)) < 0) {
632                 printk(KERN_WARNING "%s: can't encode arguments: %d\n",
633                                 clnt->cl_protname, -status);
634                 rpc_exit(task, status);
635         }
636 }
637
638 /*
639  * 4.   Get the server port number if not yet set
640  */
641 static void
642 call_bind(struct rpc_task *task)
643 {
644         struct rpc_clnt *clnt = task->tk_client;
645         struct rpc_xprt *xprt = clnt->cl_xprt;
646
647         dprintk("RPC: %4d call_bind xprt %p %s connected\n", task->tk_pid,
648                         xprt, (xprt_connected(xprt) ? "is" : "is not"));
649
650         task->tk_action = (xprt_connected(xprt)) ? call_transmit : call_connect;
651
652         if (!clnt->cl_port) {
653                 task->tk_action = call_connect;
654                 task->tk_timeout = RPC_CONNECT_TIMEOUT;
655                 rpc_getport(task, clnt);
656         }
657 }
658
659 /*
660  * 4a.  Connect to the RPC server (TCP case)
661  */
662 static void
663 call_connect(struct rpc_task *task)
664 {
665         struct rpc_clnt *clnt = task->tk_client;
666
667         dprintk("RPC: %4d call_connect status %d\n",
668                                 task->tk_pid, task->tk_status);
669
670         if (xprt_connected(clnt->cl_xprt)) {
671                 task->tk_action = call_transmit;
672                 return;
673         }
674         task->tk_action = call_connect_status;
675         if (task->tk_status < 0)
676                 return;
677         xprt_connect(task);
678 }
679
680 /*
681  * 4b. Sort out connect result
682  */
683 static void
684 call_connect_status(struct rpc_task *task)
685 {
686         struct rpc_clnt *clnt = task->tk_client;
687         int status = task->tk_status;
688
689         task->tk_status = 0;
690         if (status >= 0) {
691                 clnt->cl_stats->netreconn++;
692                 task->tk_action = call_transmit;
693                 return;
694         }
695
696         /* Something failed: we may have to rebind */
697         if (clnt->cl_autobind)
698                 clnt->cl_port = 0;
699         switch (status) {
700         case -ENOTCONN:
701         case -ETIMEDOUT:
702         case -EAGAIN:
703                 task->tk_action = (clnt->cl_port == 0) ? call_bind : call_connect;
704                 break;
705         default:
706                 rpc_exit(task, -EIO);
707         }
708 }
709
710 /*
711  * 5.   Transmit the RPC request, and wait for reply
712  */
713 static void
714 call_transmit(struct rpc_task *task)
715 {
716         dprintk("RPC: %4d call_transmit (status %d)\n", 
717                                 task->tk_pid, task->tk_status);
718
719         task->tk_action = call_status;
720         if (task->tk_status < 0)
721                 return;
722         task->tk_status = xprt_prepare_transmit(task);
723         if (task->tk_status != 0)
724                 return;
725         /* Encode here so that rpcsec_gss can use correct sequence number. */
726         if (!task->tk_rqstp->rq_bytes_sent)
727                 call_encode(task);
728         if (task->tk_status < 0)
729                 return;
730         xprt_transmit(task);
731         if (task->tk_status < 0)
732                 return;
733         if (!task->tk_msg.rpc_proc->p_decode) {
734                 task->tk_action = NULL;
735                 rpc_wake_up_task(task);
736         }
737 }
738
739 /*
740  * 6.   Sort out the RPC call status
741  */
742 static void
743 call_status(struct rpc_task *task)
744 {
745         struct rpc_clnt *clnt = task->tk_client;
746         struct rpc_rqst *req = task->tk_rqstp;
747         int             status;
748
749         if (req->rq_received > 0 && !req->rq_bytes_sent)
750                 task->tk_status = req->rq_received;
751
752         dprintk("RPC: %4d call_status (status %d)\n", 
753                                 task->tk_pid, task->tk_status);
754
755         status = task->tk_status;
756         if (status >= 0) {
757                 task->tk_action = call_decode;
758                 return;
759         }
760
761         task->tk_status = 0;
762         switch(status) {
763         case -ETIMEDOUT:
764                 task->tk_action = call_timeout;
765                 break;
766         case -ECONNREFUSED:
767         case -ENOTCONN:
768                 req->rq_bytes_sent = 0;
769                 if (clnt->cl_autobind)
770                         clnt->cl_port = 0;
771                 task->tk_action = call_bind;
772                 break;
773         case -EAGAIN:
774                 task->tk_action = call_transmit;
775                 break;
776         case -EIO:
777                 /* shutdown or soft timeout */
778                 rpc_exit(task, status);
779                 break;
780         default:
781                 if (clnt->cl_chatty)
782                         printk("%s: RPC call returned error %d\n",
783                                clnt->cl_protname, -status);
784                 rpc_exit(task, status);
785                 break;
786         }
787 }
788
789 /*
790  * 6a.  Handle RPC timeout
791  *      We do not release the request slot, so we keep using the
792  *      same XID for all retransmits.
793  */
794 static void
795 call_timeout(struct rpc_task *task)
796 {
797         struct rpc_clnt *clnt = task->tk_client;
798
799         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
800                 dprintk("RPC: %4d call_timeout (minor)\n", task->tk_pid);
801                 goto retry;
802         }
803
804         dprintk("RPC: %4d call_timeout (major)\n", task->tk_pid);
805         if (RPC_IS_SOFT(task)) {
806                 if (clnt->cl_chatty)
807                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
808                                 clnt->cl_protname, clnt->cl_server);
809                 rpc_exit(task, -EIO);
810                 return;
811         }
812
813         if (clnt->cl_chatty && !(task->tk_flags & RPC_CALL_MAJORSEEN)) {
814                 task->tk_flags |= RPC_CALL_MAJORSEEN;
815                 printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
816                         clnt->cl_protname, clnt->cl_server);
817         }
818         if (clnt->cl_autobind)
819                 clnt->cl_port = 0;
820
821 retry:
822         clnt->cl_stats->rpcretrans++;
823         task->tk_action = call_bind;
824         task->tk_status = 0;
825 }
826
827 /*
828  * 7.   Decode the RPC reply
829  */
830 static void
831 call_decode(struct rpc_task *task)
832 {
833         struct rpc_clnt *clnt = task->tk_client;
834         struct rpc_rqst *req = task->tk_rqstp;
835         kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
836         u32             *p;
837
838         dprintk("RPC: %4d call_decode (status %d)\n", 
839                                 task->tk_pid, task->tk_status);
840
841         if (clnt->cl_chatty && (task->tk_flags & RPC_CALL_MAJORSEEN)) {
842                 printk(KERN_NOTICE "%s: server %s OK\n",
843                         clnt->cl_protname, clnt->cl_server);
844                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
845         }
846
847         if (task->tk_status < 12) {
848                 if (!RPC_IS_SOFT(task)) {
849                         task->tk_action = call_bind;
850                         clnt->cl_stats->rpcretrans++;
851                         goto out_retry;
852                 }
853                 printk(KERN_WARNING "%s: too small RPC reply size (%d bytes)\n",
854                         clnt->cl_protname, task->tk_status);
855                 rpc_exit(task, -EIO);
856                 return;
857         }
858
859         req->rq_rcv_buf.len = req->rq_private_buf.len;
860
861         /* Check that the softirq receive buffer is valid */
862         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
863                                 sizeof(req->rq_rcv_buf)) != 0);
864
865         /* Verify the RPC header */
866         if (!(p = call_verify(task))) {
867                 if (task->tk_action == NULL)
868                         return;
869                 goto out_retry;
870         }
871
872         /*
873          * The following is an NFS-specific hack to cater for setuid
874          * processes whose uid is mapped to nobody on the server.
875          */
876         if (task->tk_client->cl_droppriv && 
877             (ntohl(*p) == NFSERR_ACCES || ntohl(*p) == NFSERR_PERM)) {
878                 if (RPC_IS_SETUID(task) && task->tk_suid_retry) {
879                         dprintk("RPC: %4d retry squashed uid\n", task->tk_pid);
880                         task->tk_flags ^= RPC_CALL_REALUID;
881                         task->tk_action = call_bind;
882                         task->tk_suid_retry--;
883                         goto out_retry;
884                 }
885         }
886
887         task->tk_action = NULL;
888
889         if (decode)
890                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
891                                                       task->tk_msg.rpc_resp);
892         dprintk("RPC: %4d call_decode result %d\n", task->tk_pid,
893                                         task->tk_status);
894         return;
895 out_retry:
896         req->rq_received = req->rq_private_buf.len = 0;
897         task->tk_status = 0;
898 }
899
900 /*
901  * 8.   Refresh the credentials if rejected by the server
902  */
903 static void
904 call_refresh(struct rpc_task *task)
905 {
906         dprintk("RPC: %4d call_refresh\n", task->tk_pid);
907
908         xprt_release(task);     /* Must do to obtain new XID */
909         task->tk_action = call_refreshresult;
910         task->tk_status = 0;
911         task->tk_client->cl_stats->rpcauthrefresh++;
912         rpcauth_refreshcred(task);
913 }
914
915 /*
916  * 8a.  Process the results of a credential refresh
917  */
918 static void
919 call_refreshresult(struct rpc_task *task)
920 {
921         int status = task->tk_status;
922         dprintk("RPC: %4d call_refreshresult (status %d)\n", 
923                                 task->tk_pid, task->tk_status);
924
925         task->tk_status = 0;
926         task->tk_action = call_reserve;
927         if (status >= 0 && rpcauth_uptodatecred(task))
928                 return;
929         if (rpcauth_deadcred(task)) {
930                 rpc_exit(task, -EACCES);
931                 return;
932         }
933         task->tk_action = call_refresh;
934         if (status != -ETIMEDOUT)
935                 rpc_delay(task, 3*HZ);
936         return;
937 }
938
939 /*
940  * Call header serialization
941  */
942 static u32 *
943 call_header(struct rpc_task *task)
944 {
945         struct rpc_clnt *clnt = task->tk_client;
946         struct rpc_xprt *xprt = clnt->cl_xprt;
947         struct rpc_rqst *req = task->tk_rqstp;
948         u32             *p = req->rq_svec[0].iov_base;
949
950         /* FIXME: check buffer size? */
951         if (xprt->stream)
952                 *p++ = 0;               /* fill in later */
953         *p++ = req->rq_xid;             /* XID */
954         *p++ = htonl(RPC_CALL);         /* CALL */
955         *p++ = htonl(RPC_VERSION);      /* RPC version */
956         *p++ = htonl(clnt->cl_prog);    /* program number */
957         *p++ = htonl(clnt->cl_vers);    /* program version */
958         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
959         return rpcauth_marshcred(task, p);
960 }
961
962 /*
963  * Reply header verification
964  */
965 static u32 *
966 call_verify(struct rpc_task *task)
967 {
968         u32     *p = task->tk_rqstp->rq_rcv_buf.head[0].iov_base, n;
969
970         p += 1; /* skip XID */
971
972         if ((n = ntohl(*p++)) != RPC_REPLY) {
973                 printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
974                 goto garbage;
975         }
976         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
977                 int     error = -EACCES;
978
979                 if ((n = ntohl(*p++)) != RPC_AUTH_ERROR) {
980                         printk(KERN_WARNING "call_verify: RPC call rejected: %x\n", n);
981                 } else
982                 switch ((n = ntohl(*p++))) {
983                 case RPC_AUTH_REJECTEDCRED:
984                 case RPC_AUTH_REJECTEDVERF:
985                 case RPCSEC_GSS_CREDPROBLEM:
986                 case RPCSEC_GSS_CTXPROBLEM:
987                         if (!task->tk_cred_retry)
988                                 break;
989                         task->tk_cred_retry--;
990                         dprintk("RPC: %4d call_verify: retry stale creds\n",
991                                                         task->tk_pid);
992                         rpcauth_invalcred(task);
993                         task->tk_action = call_refresh;
994                         return NULL;
995                 case RPC_AUTH_BADCRED:
996                 case RPC_AUTH_BADVERF:
997                         /* possibly garbled cred/verf? */
998                         if (!task->tk_garb_retry)
999                                 break;
1000                         task->tk_garb_retry--;
1001                         dprintk("RPC: %4d call_verify: retry garbled creds\n",
1002                                                         task->tk_pid);
1003                         task->tk_action = call_bind;
1004                         return NULL;
1005                 case RPC_AUTH_TOOWEAK:
1006                         printk(KERN_NOTICE "call_verify: server requires stronger "
1007                                "authentication.\n");
1008                         break;
1009                 default:
1010                         printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1011                         error = -EIO;
1012                 }
1013                 dprintk("RPC: %4d call_verify: call rejected %d\n",
1014                                                 task->tk_pid, n);
1015                 rpc_exit(task, error);
1016                 return NULL;
1017         }
1018         if (!(p = rpcauth_checkverf(task, p))) {
1019                 printk(KERN_WARNING "call_verify: auth check failed\n");
1020                 goto garbage;           /* bad verifier, retry */
1021         }
1022         switch ((n = ntohl(*p++))) {
1023         case RPC_SUCCESS:
1024                 return p;
1025         case RPC_PROG_UNAVAIL:
1026                 printk(KERN_WARNING "RPC: call_verify: program %u is unsupported by server %s\n",
1027                                 (unsigned int)task->tk_client->cl_prog,
1028                                 task->tk_client->cl_server);
1029                 goto out_eio;
1030         case RPC_PROG_MISMATCH:
1031                 printk(KERN_WARNING "RPC: call_verify: program %u, version %u unsupported by server %s\n",
1032                                 (unsigned int)task->tk_client->cl_prog,
1033                                 (unsigned int)task->tk_client->cl_vers,
1034                                 task->tk_client->cl_server);
1035                 goto out_eio;
1036         case RPC_PROC_UNAVAIL:
1037                 printk(KERN_WARNING "RPC: call_verify: proc %p unsupported by program %u, version %u on server %s\n",
1038                                 task->tk_msg.rpc_proc,
1039                                 task->tk_client->cl_prog,
1040                                 task->tk_client->cl_vers,
1041                                 task->tk_client->cl_server);
1042                 goto out_eio;
1043         case RPC_GARBAGE_ARGS:
1044                 break;                  /* retry */
1045         default:
1046                 printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1047                 /* Also retry */
1048         }
1049
1050 garbage:
1051         dprintk("RPC: %4d call_verify: server saw garbage\n", task->tk_pid);
1052         task->tk_client->cl_stats->rpcgarbage++;
1053         if (task->tk_garb_retry) {
1054                 task->tk_garb_retry--;
1055                 dprintk(KERN_WARNING "RPC: garbage, retrying %4d\n", task->tk_pid);
1056                 task->tk_action = call_bind;
1057                 return NULL;
1058         }
1059         printk(KERN_WARNING "RPC: garbage, exit EIO\n");
1060 out_eio:
1061         rpc_exit(task, -EIO);
1062         return NULL;
1063 }