vserver 1.9.5.x5
[linux-2.6.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/rpcclnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23
24 #include <asm/system.h>
25
26 #include <linux/types.h>
27 #include <linux/mm.h>
28 #include <linux/slab.h>
29 #include <linux/in.h>
30 #include <linux/utsname.h>
31
32 #include <linux/sunrpc/clnt.h>
33 #include <linux/workqueue.h>
34 #include <linux/sunrpc/rpc_pipe_fs.h>
35
36 #include <linux/nfs.h>
37
38
39 #define RPC_SLACK_SPACE         (1024)  /* total overkill */
40
41 #ifdef RPC_DEBUG
42 # define RPCDBG_FACILITY        RPCDBG_CALL
43 #endif
44
45 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
46
47
48 static void     call_start(struct rpc_task *task);
49 static void     call_reserve(struct rpc_task *task);
50 static void     call_reserveresult(struct rpc_task *task);
51 static void     call_allocate(struct rpc_task *task);
52 static void     call_encode(struct rpc_task *task);
53 static void     call_decode(struct rpc_task *task);
54 static void     call_bind(struct rpc_task *task);
55 static void     call_transmit(struct rpc_task *task);
56 static void     call_status(struct rpc_task *task);
57 static void     call_refresh(struct rpc_task *task);
58 static void     call_refreshresult(struct rpc_task *task);
59 static void     call_timeout(struct rpc_task *task);
60 static void     call_connect(struct rpc_task *task);
61 static void     call_connect_status(struct rpc_task *task);
62 static u32 *    call_header(struct rpc_task *task);
63 static u32 *    call_verify(struct rpc_task *task);
64
65
66 static int
67 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
68 {
69         static uint32_t clntid;
70         int error;
71
72         if (dir_name == NULL)
73                 return 0;
74         for (;;) {
75                 snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
76                                 "%s/clnt%x", dir_name,
77                                 (unsigned int)clntid++);
78                 clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
79                 clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
80                 if (!IS_ERR(clnt->cl_dentry))
81                         return 0;
82                 error = PTR_ERR(clnt->cl_dentry);
83                 if (error != -EEXIST) {
84                         printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
85                                         clnt->cl_pathname, error);
86                         return error;
87                 }
88         }
89 }
90
91 /*
92  * Create an RPC client
93  * FIXME: This should also take a flags argument (as in task->tk_flags).
94  * It's called (among others) from pmap_create_client, which may in
95  * turn be called by an async task. In this case, rpciod should not be
96  * made to sleep too long.
97  */
98 struct rpc_clnt *
99 rpc_create_client(struct rpc_xprt *xprt, char *servname,
100                   struct rpc_program *program, u32 vers,
101                   rpc_authflavor_t flavor)
102 {
103         struct rpc_version      *version;
104         struct rpc_clnt         *clnt = NULL;
105         int err;
106         int len;
107
108         dprintk("RPC: creating %s client for %s (xprt %p)\n",
109                 program->name, servname, xprt);
110
111         err = -EINVAL;
112         if (!xprt)
113                 goto out_err;
114         if (vers >= program->nrvers || !(version = program->version[vers]))
115                 goto out_err;
116
117         err = -ENOMEM;
118         clnt = (struct rpc_clnt *) kmalloc(sizeof(*clnt), GFP_KERNEL);
119         if (!clnt)
120                 goto out_err;
121         memset(clnt, 0, sizeof(*clnt));
122         atomic_set(&clnt->cl_users, 0);
123         atomic_set(&clnt->cl_count, 1);
124         clnt->cl_parent = clnt;
125
126         clnt->cl_server = clnt->cl_inline_name;
127         len = strlen(servname) + 1;
128         if (len > sizeof(clnt->cl_inline_name)) {
129                 char *buf = kmalloc(len, GFP_KERNEL);
130                 if (buf != 0)
131                         clnt->cl_server = buf;
132                 else
133                         len = sizeof(clnt->cl_inline_name);
134         }
135         strlcpy(clnt->cl_server, servname, len);
136
137         clnt->cl_xprt     = xprt;
138         clnt->cl_procinfo = version->procs;
139         clnt->cl_maxproc  = version->nrprocs;
140         clnt->cl_protname = program->name;
141         clnt->cl_pmap     = &clnt->cl_pmap_default;
142         clnt->cl_port     = xprt->addr.sin_port;
143         clnt->cl_prog     = program->number;
144         clnt->cl_vers     = version->number;
145         clnt->cl_prot     = xprt->prot;
146         clnt->cl_stats    = program->stats;
147         rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait");
148
149         if (!clnt->cl_port)
150                 clnt->cl_autobind = 1;
151
152         clnt->cl_rtt = &clnt->cl_rtt_default;
153         rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
154
155         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
156         if (err < 0)
157                 goto out_no_path;
158
159         err = -ENOMEM;
160         if (!rpcauth_create(flavor, clnt)) {
161                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
162                                 flavor);
163                 goto out_no_auth;
164         }
165
166         /* save the nodename */
167         clnt->cl_nodelen = strlen(system_utsname.nodename);
168         if (clnt->cl_nodelen > UNX_MAXNODENAME)
169                 clnt->cl_nodelen = UNX_MAXNODENAME;
170         memcpy(clnt->cl_nodename, system_utsname.nodename, clnt->cl_nodelen);
171         return clnt;
172
173 out_no_auth:
174         rpc_rmdir(clnt->cl_pathname);
175 out_no_path:
176         if (clnt->cl_server != clnt->cl_inline_name)
177                 kfree(clnt->cl_server);
178         kfree(clnt);
179 out_err:
180         return ERR_PTR(err);
181 }
182
183 /*
184  * This function clones the RPC client structure. It allows us to share the
185  * same transport while varying parameters such as the authentication
186  * flavour.
187  */
188 struct rpc_clnt *
189 rpc_clone_client(struct rpc_clnt *clnt)
190 {
191         struct rpc_clnt *new;
192
193         new = (struct rpc_clnt *)kmalloc(sizeof(*new), GFP_KERNEL);
194         if (!new)
195                 goto out_no_clnt;
196         memcpy(new, clnt, sizeof(*new));
197         atomic_set(&new->cl_count, 1);
198         atomic_set(&new->cl_users, 0);
199         new->cl_parent = clnt;
200         atomic_inc(&clnt->cl_count);
201         /* Duplicate portmapper */
202         rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait");
203         /* Turn off autobind on clones */
204         new->cl_autobind = 0;
205         new->cl_oneshot = 0;
206         new->cl_dead = 0;
207         rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
208         if (new->cl_auth)
209                 atomic_inc(&new->cl_auth->au_count);
210         return new;
211 out_no_clnt:
212         printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__);
213         return ERR_PTR(-ENOMEM);
214 }
215
216 /*
217  * Properly shut down an RPC client, terminating all outstanding
218  * requests. Note that we must be certain that cl_oneshot and
219  * cl_dead are cleared, or else the client would be destroyed
220  * when the last task releases it.
221  */
222 int
223 rpc_shutdown_client(struct rpc_clnt *clnt)
224 {
225         dprintk("RPC: shutting down %s client for %s, tasks=%d\n",
226                         clnt->cl_protname, clnt->cl_server,
227                         atomic_read(&clnt->cl_users));
228
229         while (atomic_read(&clnt->cl_users) > 0) {
230                 /* Don't let rpc_release_client destroy us */
231                 clnt->cl_oneshot = 0;
232                 clnt->cl_dead = 0;
233                 rpc_killall_tasks(clnt);
234                 sleep_on_timeout(&destroy_wait, 1*HZ);
235         }
236
237         if (atomic_read(&clnt->cl_users) < 0) {
238                 printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n",
239                                 clnt, atomic_read(&clnt->cl_users));
240 #ifdef RPC_DEBUG
241                 rpc_show_tasks();
242 #endif
243                 BUG();
244         }
245
246         return rpc_destroy_client(clnt);
247 }
248
249 /*
250  * Delete an RPC client
251  */
252 int
253 rpc_destroy_client(struct rpc_clnt *clnt)
254 {
255         if (!atomic_dec_and_test(&clnt->cl_count))
256                 return 1;
257         BUG_ON(atomic_read(&clnt->cl_users) != 0);
258
259         dprintk("RPC: destroying %s client for %s\n",
260                         clnt->cl_protname, clnt->cl_server);
261         if (clnt->cl_auth) {
262                 rpcauth_destroy(clnt->cl_auth);
263                 clnt->cl_auth = NULL;
264         }
265         if (clnt->cl_parent != clnt) {
266                 rpc_destroy_client(clnt->cl_parent);
267                 goto out_free;
268         }
269         if (clnt->cl_pathname[0])
270                 rpc_rmdir(clnt->cl_pathname);
271         if (clnt->cl_xprt) {
272                 xprt_destroy(clnt->cl_xprt);
273                 clnt->cl_xprt = NULL;
274         }
275         if (clnt->cl_server != clnt->cl_inline_name)
276                 kfree(clnt->cl_server);
277 out_free:
278         kfree(clnt);
279         return 0;
280 }
281
282 /*
283  * Release an RPC client
284  */
285 void
286 rpc_release_client(struct rpc_clnt *clnt)
287 {
288         dprintk("RPC:      rpc_release_client(%p, %d)\n",
289                                 clnt, atomic_read(&clnt->cl_users));
290
291         if (!atomic_dec_and_test(&clnt->cl_users))
292                 return;
293         wake_up(&destroy_wait);
294         if (clnt->cl_oneshot || clnt->cl_dead)
295                 rpc_destroy_client(clnt);
296 }
297
298 /*
299  * Default callback for async RPC calls
300  */
301 static void
302 rpc_default_callback(struct rpc_task *task)
303 {
304 }
305
306 /*
307  *      Export the signal mask handling for aysnchronous code that
308  *      sleeps on RPC calls
309  */
310  
311 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
312 {
313         unsigned long   sigallow = sigmask(SIGKILL);
314         unsigned long   irqflags;
315         
316         /* Turn off various signals */
317         if (clnt->cl_intr) {
318                 struct k_sigaction *action = current->sighand->action;
319                 if (action[SIGINT-1].sa.sa_handler == SIG_DFL)
320                         sigallow |= sigmask(SIGINT);
321                 if (action[SIGQUIT-1].sa.sa_handler == SIG_DFL)
322                         sigallow |= sigmask(SIGQUIT);
323         }
324         spin_lock_irqsave(&current->sighand->siglock, irqflags);
325         *oldset = current->blocked;
326         siginitsetinv(&current->blocked, sigallow & ~oldset->sig[0]);
327         recalc_sigpending();
328         spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
329 }
330
331 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
332 {
333         unsigned long   irqflags;
334         
335         spin_lock_irqsave(&current->sighand->siglock, irqflags);
336         current->blocked = *oldset;
337         recalc_sigpending();
338         spin_unlock_irqrestore(&current->sighand->siglock, irqflags);
339 }
340
341 /*
342  * New rpc_call implementation
343  */
344 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
345 {
346         struct rpc_task *task;
347         sigset_t        oldset;
348         int             status;
349
350         /* If this client is slain all further I/O fails */
351         if (clnt->cl_dead) 
352                 return -EIO;
353
354         BUG_ON(flags & RPC_TASK_ASYNC);
355
356         rpc_clnt_sigmask(clnt, &oldset);                
357
358         status = -ENOMEM;
359         task = rpc_new_task(clnt, NULL, flags);
360         if (task == NULL)
361                 goto out;
362
363         rpc_call_setup(task, msg, 0);
364
365         /* Set up the call info struct and execute the task */
366         if (task->tk_status == 0)
367                 status = rpc_execute(task);
368         else {
369                 status = task->tk_status;
370                 rpc_release_task(task);
371         }
372
373 out:
374         rpc_clnt_sigunmask(clnt, &oldset);              
375
376         return status;
377 }
378
379 /*
380  * New rpc_call implementation
381  */
382 int
383 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
384                rpc_action callback, void *data)
385 {
386         struct rpc_task *task;
387         sigset_t        oldset;
388         int             status;
389
390         /* If this client is slain all further I/O fails */
391         if (clnt->cl_dead) 
392                 return -EIO;
393
394         flags |= RPC_TASK_ASYNC;
395
396         rpc_clnt_sigmask(clnt, &oldset);                
397
398         /* Create/initialize a new RPC task */
399         if (!callback)
400                 callback = rpc_default_callback;
401         status = -ENOMEM;
402         if (!(task = rpc_new_task(clnt, callback, flags)))
403                 goto out;
404         task->tk_calldata = data;
405
406         rpc_call_setup(task, msg, 0);
407
408         /* Set up the call info struct and execute the task */
409         if (task->tk_status == 0)
410                 status = rpc_execute(task);
411         else {
412                 status = task->tk_status;
413                 rpc_release_task(task);
414         }
415
416 out:
417         rpc_clnt_sigunmask(clnt, &oldset);              
418
419         return status;
420 }
421
422
423 void
424 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
425 {
426         task->tk_msg   = *msg;
427         task->tk_flags |= flags;
428         /* Bind the user cred */
429         if (task->tk_msg.rpc_cred != NULL) {
430                 rpcauth_holdcred(task);
431         } else
432                 rpcauth_bindcred(task);
433
434         if (task->tk_status == 0)
435                 task->tk_action = call_start;
436         else
437                 task->tk_action = NULL;
438 }
439
440 void
441 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
442 {
443         struct rpc_xprt *xprt = clnt->cl_xprt;
444
445         xprt->sndsize = 0;
446         if (sndsize)
447                 xprt->sndsize = sndsize + RPC_SLACK_SPACE;
448         xprt->rcvsize = 0;
449         if (rcvsize)
450                 xprt->rcvsize = rcvsize + RPC_SLACK_SPACE;
451         if (xprt_connected(xprt))
452                 xprt_sock_setbufsize(xprt);
453 }
454
455 /*
456  * Restart an (async) RPC call. Usually called from within the
457  * exit handler.
458  */
459 void
460 rpc_restart_call(struct rpc_task *task)
461 {
462         if (RPC_ASSASSINATED(task))
463                 return;
464
465         task->tk_action = call_start;
466 }
467
468 /*
469  * 0.  Initial state
470  *
471  *     Other FSM states can be visited zero or more times, but
472  *     this state is visited exactly once for each RPC.
473  */
474 static void
475 call_start(struct rpc_task *task)
476 {
477         struct rpc_clnt *clnt = task->tk_client;
478
479         dprintk("RPC: %4d call_start %s%d proc %d (%s)\n", task->tk_pid,
480                 clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc->p_proc,
481                 (RPC_IS_ASYNC(task) ? "async" : "sync"));
482
483         /* Increment call count */
484         task->tk_msg.rpc_proc->p_count++;
485         clnt->cl_stats->rpccnt++;
486         task->tk_action = call_reserve;
487 }
488
489 /*
490  * 1.   Reserve an RPC call slot
491  */
492 static void
493 call_reserve(struct rpc_task *task)
494 {
495         dprintk("RPC: %4d call_reserve\n", task->tk_pid);
496
497         if (!rpcauth_uptodatecred(task)) {
498                 task->tk_action = call_refresh;
499                 return;
500         }
501
502         task->tk_status  = 0;
503         task->tk_action  = call_reserveresult;
504         xprt_reserve(task);
505 }
506
507 /*
508  * 1b.  Grok the result of xprt_reserve()
509  */
510 static void
511 call_reserveresult(struct rpc_task *task)
512 {
513         int status = task->tk_status;
514
515         dprintk("RPC: %4d call_reserveresult (status %d)\n",
516                                 task->tk_pid, task->tk_status);
517
518         /*
519          * After a call to xprt_reserve(), we must have either
520          * a request slot or else an error status.
521          */
522         task->tk_status = 0;
523         if (status >= 0) {
524                 if (task->tk_rqstp) {
525                         task->tk_action = call_allocate;
526                         return;
527                 }
528
529                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
530                                 __FUNCTION__, status);
531                 rpc_exit(task, -EIO);
532                 return;
533         }
534
535         /*
536          * Even though there was an error, we may have acquired
537          * a request slot somehow.  Make sure not to leak it.
538          */
539         if (task->tk_rqstp) {
540                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
541                                 __FUNCTION__, status);
542                 xprt_release(task);
543         }
544
545         switch (status) {
546         case -EAGAIN:   /* woken up; retry */
547                 task->tk_action = call_reserve;
548                 return;
549         case -EIO:      /* probably a shutdown */
550                 break;
551         default:
552                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
553                                 __FUNCTION__, status);
554                 break;
555         }
556         rpc_exit(task, status);
557 }
558
559 /*
560  * 2.   Allocate the buffer. For details, see sched.c:rpc_malloc.
561  *      (Note: buffer memory is freed in rpc_task_release).
562  */
563 static void
564 call_allocate(struct rpc_task *task)
565 {
566         unsigned int    bufsiz;
567
568         dprintk("RPC: %4d call_allocate (status %d)\n", 
569                                 task->tk_pid, task->tk_status);
570         task->tk_action = call_bind;
571         if (task->tk_buffer)
572                 return;
573
574         /* FIXME: compute buffer requirements more exactly using
575          * auth->au_wslack */
576         bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
577
578         if (rpc_malloc(task, bufsiz << 1) != NULL)
579                 return;
580         printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); 
581
582         if (RPC_IS_ASYNC(task) || !(task->tk_client->cl_intr && signalled())) {
583                 xprt_release(task);
584                 task->tk_action = call_reserve;
585                 rpc_delay(task, HZ>>4);
586                 return;
587         }
588
589         rpc_exit(task, -ERESTARTSYS);
590 }
591
592 /*
593  * 3.   Encode arguments of an RPC call
594  */
595 static void
596 call_encode(struct rpc_task *task)
597 {
598         struct rpc_clnt *clnt = task->tk_client;
599         struct rpc_rqst *req = task->tk_rqstp;
600         struct xdr_buf *sndbuf = &req->rq_snd_buf;
601         struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
602         unsigned int    bufsiz;
603         kxdrproc_t      encode;
604         int             status;
605         u32             *p;
606
607         dprintk("RPC: %4d call_encode (status %d)\n", 
608                                 task->tk_pid, task->tk_status);
609
610         /* Default buffer setup */
611         bufsiz = task->tk_bufsize >> 1;
612         sndbuf->head[0].iov_base = (void *)task->tk_buffer;
613         sndbuf->head[0].iov_len  = bufsiz;
614         sndbuf->tail[0].iov_len  = 0;
615         sndbuf->page_len         = 0;
616         sndbuf->len              = 0;
617         sndbuf->buflen           = bufsiz;
618         rcvbuf->head[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz);
619         rcvbuf->head[0].iov_len  = bufsiz;
620         rcvbuf->tail[0].iov_len  = 0;
621         rcvbuf->page_len         = 0;
622         rcvbuf->len              = 0;
623         rcvbuf->buflen           = bufsiz;
624
625         /* Encode header and provided arguments */
626         encode = task->tk_msg.rpc_proc->p_encode;
627         if (!(p = call_header(task))) {
628                 printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
629                 rpc_exit(task, -EIO);
630                 return;
631         }
632         if (encode && (status = rpcauth_wrap_req(task, encode, req, p,
633                                                  task->tk_msg.rpc_argp)) < 0) {
634                 printk(KERN_WARNING "%s: can't encode arguments: %d\n",
635                                 clnt->cl_protname, -status);
636                 rpc_exit(task, status);
637         }
638 }
639
640 /*
641  * 4.   Get the server port number if not yet set
642  */
643 static void
644 call_bind(struct rpc_task *task)
645 {
646         struct rpc_clnt *clnt = task->tk_client;
647         struct rpc_xprt *xprt = clnt->cl_xprt;
648
649         dprintk("RPC: %4d call_bind xprt %p %s connected\n", task->tk_pid,
650                         xprt, (xprt_connected(xprt) ? "is" : "is not"));
651
652         task->tk_action = (xprt_connected(xprt)) ? call_transmit : call_connect;
653
654         if (!clnt->cl_port) {
655                 task->tk_action = call_connect;
656                 task->tk_timeout = RPC_CONNECT_TIMEOUT;
657                 rpc_getport(task, clnt);
658         }
659 }
660
661 /*
662  * 4a.  Connect to the RPC server (TCP case)
663  */
664 static void
665 call_connect(struct rpc_task *task)
666 {
667         struct rpc_clnt *clnt = task->tk_client;
668
669         dprintk("RPC: %4d call_connect status %d\n",
670                                 task->tk_pid, task->tk_status);
671
672         if (xprt_connected(clnt->cl_xprt)) {
673                 task->tk_action = call_transmit;
674                 return;
675         }
676         task->tk_action = call_connect_status;
677         if (task->tk_status < 0)
678                 return;
679         xprt_connect(task);
680 }
681
682 /*
683  * 4b. Sort out connect result
684  */
685 static void
686 call_connect_status(struct rpc_task *task)
687 {
688         struct rpc_clnt *clnt = task->tk_client;
689         int status = task->tk_status;
690
691         task->tk_status = 0;
692         if (status >= 0) {
693                 clnt->cl_stats->netreconn++;
694                 task->tk_action = call_transmit;
695                 return;
696         }
697
698         /* Something failed: we may have to rebind */
699         if (clnt->cl_autobind)
700                 clnt->cl_port = 0;
701         switch (status) {
702         case -ENOTCONN:
703         case -ETIMEDOUT:
704         case -EAGAIN:
705                 task->tk_action = (clnt->cl_port == 0) ? call_bind : call_connect;
706                 break;
707         default:
708                 rpc_exit(task, -EIO);
709         }
710 }
711
712 /*
713  * 5.   Transmit the RPC request, and wait for reply
714  */
715 static void
716 call_transmit(struct rpc_task *task)
717 {
718         dprintk("RPC: %4d call_transmit (status %d)\n", 
719                                 task->tk_pid, task->tk_status);
720
721         task->tk_action = call_status;
722         if (task->tk_status < 0)
723                 return;
724         task->tk_status = xprt_prepare_transmit(task);
725         if (task->tk_status != 0)
726                 return;
727         /* Encode here so that rpcsec_gss can use correct sequence number. */
728         if (!task->tk_rqstp->rq_bytes_sent)
729                 call_encode(task);
730         if (task->tk_status < 0)
731                 return;
732         xprt_transmit(task);
733         if (task->tk_status < 0)
734                 return;
735         if (!task->tk_msg.rpc_proc->p_decode) {
736                 task->tk_action = NULL;
737                 rpc_wake_up_task(task);
738         }
739 }
740
741 /*
742  * 6.   Sort out the RPC call status
743  */
744 static void
745 call_status(struct rpc_task *task)
746 {
747         struct rpc_clnt *clnt = task->tk_client;
748         struct rpc_rqst *req = task->tk_rqstp;
749         int             status;
750
751         if (req->rq_received > 0 && !req->rq_bytes_sent)
752                 task->tk_status = req->rq_received;
753
754         dprintk("RPC: %4d call_status (status %d)\n", 
755                                 task->tk_pid, task->tk_status);
756
757         status = task->tk_status;
758         if (status >= 0) {
759                 task->tk_action = call_decode;
760                 return;
761         }
762
763         task->tk_status = 0;
764         switch(status) {
765         case -ETIMEDOUT:
766                 task->tk_action = call_timeout;
767                 break;
768         case -ECONNREFUSED:
769         case -ENOTCONN:
770                 req->rq_bytes_sent = 0;
771                 if (clnt->cl_autobind)
772                         clnt->cl_port = 0;
773                 task->tk_action = call_bind;
774                 break;
775         case -EAGAIN:
776                 task->tk_action = call_transmit;
777                 break;
778         case -EIO:
779                 /* shutdown or soft timeout */
780                 rpc_exit(task, status);
781                 break;
782         default:
783                 if (clnt->cl_chatty)
784                         printk("%s: RPC call returned error %d\n",
785                                clnt->cl_protname, -status);
786                 rpc_exit(task, status);
787                 break;
788         }
789 }
790
791 /*
792  * 6a.  Handle RPC timeout
793  *      We do not release the request slot, so we keep using the
794  *      same XID for all retransmits.
795  */
796 static void
797 call_timeout(struct rpc_task *task)
798 {
799         struct rpc_clnt *clnt = task->tk_client;
800
801         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
802                 dprintk("RPC: %4d call_timeout (minor)\n", task->tk_pid);
803                 goto retry;
804         }
805
806         dprintk("RPC: %4d call_timeout (major)\n", task->tk_pid);
807         if (RPC_IS_SOFT(task)) {
808                 if (clnt->cl_chatty)
809                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
810                                 clnt->cl_protname, clnt->cl_server);
811                 rpc_exit(task, -EIO);
812                 return;
813         }
814
815         if (clnt->cl_chatty && !(task->tk_flags & RPC_CALL_MAJORSEEN)) {
816                 task->tk_flags |= RPC_CALL_MAJORSEEN;
817                 printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
818                         clnt->cl_protname, clnt->cl_server);
819         }
820         if (clnt->cl_autobind)
821                 clnt->cl_port = 0;
822
823 retry:
824         clnt->cl_stats->rpcretrans++;
825         task->tk_action = call_bind;
826         task->tk_status = 0;
827 }
828
829 /*
830  * 7.   Decode the RPC reply
831  */
832 static void
833 call_decode(struct rpc_task *task)
834 {
835         struct rpc_clnt *clnt = task->tk_client;
836         struct rpc_rqst *req = task->tk_rqstp;
837         kxdrproc_t      decode = task->tk_msg.rpc_proc->p_decode;
838         u32             *p;
839
840         dprintk("RPC: %4d call_decode (status %d)\n", 
841                                 task->tk_pid, task->tk_status);
842
843         if (clnt->cl_chatty && (task->tk_flags & RPC_CALL_MAJORSEEN)) {
844                 printk(KERN_NOTICE "%s: server %s OK\n",
845                         clnt->cl_protname, clnt->cl_server);
846                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
847         }
848
849         if (task->tk_status < 12) {
850                 if (!RPC_IS_SOFT(task)) {
851                         task->tk_action = call_bind;
852                         clnt->cl_stats->rpcretrans++;
853                         goto out_retry;
854                 }
855                 printk(KERN_WARNING "%s: too small RPC reply size (%d bytes)\n",
856                         clnt->cl_protname, task->tk_status);
857                 rpc_exit(task, -EIO);
858                 return;
859         }
860
861         req->rq_rcv_buf.len = req->rq_private_buf.len;
862
863         /* Check that the softirq receive buffer is valid */
864         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
865                                 sizeof(req->rq_rcv_buf)) != 0);
866
867         /* Verify the RPC header */
868         if (!(p = call_verify(task))) {
869                 if (task->tk_action == NULL)
870                         return;
871                 goto out_retry;
872         }
873
874         /*
875          * The following is an NFS-specific hack to cater for setuid
876          * processes whose uid is mapped to nobody on the server.
877          */
878         if (task->tk_client->cl_droppriv && 
879             (ntohl(*p) == NFSERR_ACCES || ntohl(*p) == NFSERR_PERM)) {
880                 if (RPC_IS_SETUID(task) && task->tk_suid_retry) {
881                         dprintk("RPC: %4d retry squashed uid\n", task->tk_pid);
882                         task->tk_flags ^= RPC_CALL_REALUID;
883                         task->tk_action = call_bind;
884                         task->tk_suid_retry--;
885                         goto out_retry;
886                 }
887         }
888
889         task->tk_action = NULL;
890
891         if (decode)
892                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
893                                                       task->tk_msg.rpc_resp);
894         dprintk("RPC: %4d call_decode result %d\n", task->tk_pid,
895                                         task->tk_status);
896         return;
897 out_retry:
898         req->rq_received = req->rq_private_buf.len = 0;
899         task->tk_status = 0;
900 }
901
902 /*
903  * 8.   Refresh the credentials if rejected by the server
904  */
905 static void
906 call_refresh(struct rpc_task *task)
907 {
908         dprintk("RPC: %4d call_refresh\n", task->tk_pid);
909
910         xprt_release(task);     /* Must do to obtain new XID */
911         task->tk_action = call_refreshresult;
912         task->tk_status = 0;
913         task->tk_client->cl_stats->rpcauthrefresh++;
914         rpcauth_refreshcred(task);
915 }
916
917 /*
918  * 8a.  Process the results of a credential refresh
919  */
920 static void
921 call_refreshresult(struct rpc_task *task)
922 {
923         int status = task->tk_status;
924         dprintk("RPC: %4d call_refreshresult (status %d)\n", 
925                                 task->tk_pid, task->tk_status);
926
927         task->tk_status = 0;
928         task->tk_action = call_reserve;
929         if (status >= 0 && rpcauth_uptodatecred(task))
930                 return;
931         if (status == -EACCES) {
932                 rpc_exit(task, -EACCES);
933                 return;
934         }
935         task->tk_action = call_refresh;
936         if (status != -ETIMEDOUT)
937                 rpc_delay(task, 3*HZ);
938         return;
939 }
940
941 /*
942  * Call header serialization
943  */
944 static u32 *
945 call_header(struct rpc_task *task)
946 {
947         struct rpc_clnt *clnt = task->tk_client;
948         struct rpc_xprt *xprt = clnt->cl_xprt;
949         struct rpc_rqst *req = task->tk_rqstp;
950         u32             *p = req->rq_svec[0].iov_base;
951
952         /* FIXME: check buffer size? */
953         if (xprt->stream)
954                 *p++ = 0;               /* fill in later */
955         *p++ = req->rq_xid;             /* XID */
956         *p++ = htonl(RPC_CALL);         /* CALL */
957         *p++ = htonl(RPC_VERSION);      /* RPC version */
958         *p++ = htonl(clnt->cl_prog);    /* program number */
959         *p++ = htonl(clnt->cl_vers);    /* program version */
960         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
961         return rpcauth_marshcred(task, p);
962 }
963
964 /*
965  * Reply header verification
966  */
967 static u32 *
968 call_verify(struct rpc_task *task)
969 {
970         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
971         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
972         u32     *p = iov->iov_base, n;
973         int error = -EACCES;
974
975         if ((len -= 3) < 0)
976                 goto out_overflow;
977         p += 1; /* skip XID */
978
979         if ((n = ntohl(*p++)) != RPC_REPLY) {
980                 printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
981                 goto out_retry;
982         }
983         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
984                 if (--len < 0)
985                         goto out_overflow;
986                 switch ((n = ntohl(*p++))) {
987                         case RPC_AUTH_ERROR:
988                                 break;
989                         case RPC_MISMATCH:
990                                 printk(KERN_WARNING "%s: RPC call version mismatch!\n", __FUNCTION__);
991                                 goto out_eio;
992                         default:
993                                 printk(KERN_WARNING "%s: RPC call rejected, unknown error: %x\n", __FUNCTION__, n);
994                                 goto out_eio;
995                 }
996                 if (--len < 0)
997                         goto out_overflow;
998                 switch ((n = ntohl(*p++))) {
999                 case RPC_AUTH_REJECTEDCRED:
1000                 case RPC_AUTH_REJECTEDVERF:
1001                 case RPCSEC_GSS_CREDPROBLEM:
1002                 case RPCSEC_GSS_CTXPROBLEM:
1003                         if (!task->tk_cred_retry)
1004                                 break;
1005                         task->tk_cred_retry--;
1006                         dprintk("RPC: %4d call_verify: retry stale creds\n",
1007                                                         task->tk_pid);
1008                         rpcauth_invalcred(task);
1009                         task->tk_action = call_refresh;
1010                         return NULL;
1011                 case RPC_AUTH_BADCRED:
1012                 case RPC_AUTH_BADVERF:
1013                         /* possibly garbled cred/verf? */
1014                         if (!task->tk_garb_retry)
1015                                 break;
1016                         task->tk_garb_retry--;
1017                         dprintk("RPC: %4d call_verify: retry garbled creds\n",
1018                                                         task->tk_pid);
1019                         task->tk_action = call_bind;
1020                         return NULL;
1021                 case RPC_AUTH_TOOWEAK:
1022                         printk(KERN_NOTICE "call_verify: server requires stronger "
1023                                "authentication.\n");
1024                         break;
1025                 default:
1026                         printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1027                         error = -EIO;
1028                 }
1029                 dprintk("RPC: %4d call_verify: call rejected %d\n",
1030                                                 task->tk_pid, n);
1031                 goto out_err;
1032         }
1033         if (!(p = rpcauth_checkverf(task, p))) {
1034                 printk(KERN_WARNING "call_verify: auth check failed\n");
1035                 goto out_retry;         /* bad verifier, retry */
1036         }
1037         len = p - (u32 *)iov->iov_base - 1;
1038         if (len < 0)
1039                 goto out_overflow;
1040         switch ((n = ntohl(*p++))) {
1041         case RPC_SUCCESS:
1042                 return p;
1043         case RPC_PROG_UNAVAIL:
1044                 printk(KERN_WARNING "RPC: call_verify: program %u is unsupported by server %s\n",
1045                                 (unsigned int)task->tk_client->cl_prog,
1046                                 task->tk_client->cl_server);
1047                 goto out_eio;
1048         case RPC_PROG_MISMATCH:
1049                 printk(KERN_WARNING "RPC: call_verify: program %u, version %u unsupported by server %s\n",
1050                                 (unsigned int)task->tk_client->cl_prog,
1051                                 (unsigned int)task->tk_client->cl_vers,
1052                                 task->tk_client->cl_server);
1053                 goto out_eio;
1054         case RPC_PROC_UNAVAIL:
1055                 printk(KERN_WARNING "RPC: call_verify: proc %p unsupported by program %u, version %u on server %s\n",
1056                                 task->tk_msg.rpc_proc,
1057                                 task->tk_client->cl_prog,
1058                                 task->tk_client->cl_vers,
1059                                 task->tk_client->cl_server);
1060                 goto out_eio;
1061         case RPC_GARBAGE_ARGS:
1062                 dprintk("RPC: %4d %s: server saw garbage\n", task->tk_pid, __FUNCTION__);
1063                 break;                  /* retry */
1064         default:
1065                 printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1066                 /* Also retry */
1067         }
1068
1069 out_retry:
1070         task->tk_client->cl_stats->rpcgarbage++;
1071         if (task->tk_garb_retry) {
1072                 task->tk_garb_retry--;
1073                 dprintk(KERN_WARNING "RPC %s: retrying %4d\n", __FUNCTION__, task->tk_pid);
1074                 task->tk_action = call_bind;
1075                 return NULL;
1076         }
1077         printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__);
1078 out_eio:
1079         error = -EIO;
1080 out_err:
1081         rpc_exit(task, error);
1082         return NULL;
1083 out_overflow:
1084         printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__);
1085         goto out_retry;
1086 }