1 /* call.c: Rx call routines
3 * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
12 #include <linux/sched.h>
13 #include <linux/slab.h>
14 #include <linux/module.h>
15 #include <rxrpc/rxrpc.h>
16 #include <rxrpc/transport.h>
17 #include <rxrpc/peer.h>
18 #include <rxrpc/connection.h>
19 #include <rxrpc/call.h>
20 #include <rxrpc/message.h>
23 __RXACCT_DECL(atomic_t rxrpc_call_count);
24 __RXACCT_DECL(atomic_t rxrpc_message_count);
26 LIST_HEAD(rxrpc_calls);
27 DECLARE_RWSEM(rxrpc_calls_sem);
29 unsigned rxrpc_call_rcv_timeout = HZ/3;
30 unsigned rxrpc_call_acks_timeout = HZ/3;
31 unsigned rxrpc_call_dfr_ack_timeout = HZ/20;
32 unsigned short rxrpc_call_max_resend = HZ/10;
34 const char *rxrpc_call_states[] = {
47 const char *rxrpc_call_error_states[] = {
55 const char *rxrpc_pkts[] = {
57 "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug",
58 "?09", "?10", "?11", "?12", "?13", "?14", "?15"
61 const char *rxrpc_acks[] = {
62 "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
66 static const char _acktype[] = "NA-";
68 static void rxrpc_call_receive_packet(struct rxrpc_call *call);
69 static void rxrpc_call_receive_data_packet(struct rxrpc_call *call,
70 struct rxrpc_message *msg);
71 static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call,
72 struct rxrpc_message *msg);
73 static void rxrpc_call_definitively_ACK(struct rxrpc_call *call,
75 static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest);
76 static int __rxrpc_call_read_data(struct rxrpc_call *call);
78 static int rxrpc_call_record_ACK(struct rxrpc_call *call,
79 struct rxrpc_message *msg,
82 #define _state(call) \
83 _debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]);
85 static void rxrpc_call_default_attn_func(struct rxrpc_call *call)
87 wake_up(&call->waitq);
90 static void rxrpc_call_default_error_func(struct rxrpc_call *call)
92 wake_up(&call->waitq);
95 static void rxrpc_call_default_aemap_func(struct rxrpc_call *call)
97 switch (call->app_err_state) {
98 case RXRPC_ESTATE_LOCAL_ABORT:
99 call->app_abort_code = -call->app_errno;
100 case RXRPC_ESTATE_PEER_ABORT:
101 call->app_errno = -ECONNABORTED;
107 static void __rxrpc_call_acks_timeout(unsigned long _call)
109 struct rxrpc_call *call = (struct rxrpc_call *) _call;
111 _debug("ACKS TIMEOUT %05lu", jiffies - call->cjif);
113 call->flags |= RXRPC_CALL_ACKS_TIMO;
114 rxrpc_krxiod_queue_call(call);
117 static void __rxrpc_call_rcv_timeout(unsigned long _call)
119 struct rxrpc_call *call = (struct rxrpc_call *) _call;
121 _debug("RCV TIMEOUT %05lu", jiffies - call->cjif);
123 call->flags |= RXRPC_CALL_RCV_TIMO;
124 rxrpc_krxiod_queue_call(call);
127 static void __rxrpc_call_ackr_timeout(unsigned long _call)
129 struct rxrpc_call *call = (struct rxrpc_call *) _call;
131 _debug("ACKR TIMEOUT %05lu",jiffies - call->cjif);
133 call->flags |= RXRPC_CALL_ACKR_TIMO;
134 rxrpc_krxiod_queue_call(call);
137 /*****************************************************************************/
139 * calculate a timeout based on an RTT value
141 static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call *call,
144 unsigned long expiry = call->conn->peer->rtt / (1000000 / HZ);
147 if (expiry < HZ / 25)
152 _leave(" = %lu jiffies", expiry);
153 return jiffies + expiry;
154 } /* end __rxrpc_rtt_based_timeout() */
156 /*****************************************************************************/
158 * create a new call record
160 static inline int __rxrpc_create_call(struct rxrpc_connection *conn,
161 struct rxrpc_call **_call)
163 struct rxrpc_call *call;
167 /* allocate and initialise a call record */
168 call = (struct rxrpc_call *) get_zeroed_page(GFP_KERNEL);
174 atomic_set(&call->usage, 1);
176 init_waitqueue_head(&call->waitq);
177 spin_lock_init(&call->lock);
178 INIT_LIST_HEAD(&call->link);
179 INIT_LIST_HEAD(&call->acks_pendq);
180 INIT_LIST_HEAD(&call->rcv_receiveq);
181 INIT_LIST_HEAD(&call->rcv_krxiodq_lk);
182 INIT_LIST_HEAD(&call->app_readyq);
183 INIT_LIST_HEAD(&call->app_unreadyq);
184 INIT_LIST_HEAD(&call->app_link);
185 INIT_LIST_HEAD(&call->app_attn_link);
187 init_timer(&call->acks_timeout);
188 call->acks_timeout.data = (unsigned long) call;
189 call->acks_timeout.function = __rxrpc_call_acks_timeout;
191 init_timer(&call->rcv_timeout);
192 call->rcv_timeout.data = (unsigned long) call;
193 call->rcv_timeout.function = __rxrpc_call_rcv_timeout;
195 init_timer(&call->ackr_dfr_timo);
196 call->ackr_dfr_timo.data = (unsigned long) call;
197 call->ackr_dfr_timo.function = __rxrpc_call_ackr_timeout;
200 call->ackr_win_bot = 1;
201 call->ackr_win_top = call->ackr_win_bot + RXRPC_CALL_ACK_WINDOW_SIZE - 1;
202 call->ackr_prev_seq = 0;
203 call->app_mark = RXRPC_APP_MARK_EOF;
204 call->app_attn_func = rxrpc_call_default_attn_func;
205 call->app_error_func = rxrpc_call_default_error_func;
206 call->app_aemap_func = rxrpc_call_default_aemap_func;
207 call->app_scr_alloc = call->app_scratch;
209 call->cjif = jiffies;
211 _leave(" = 0 (%p)", call);
216 } /* end __rxrpc_create_call() */
218 /*****************************************************************************/
220 * create a new call record for outgoing calls
222 int rxrpc_create_call(struct rxrpc_connection *conn,
223 rxrpc_call_attn_func_t attn,
224 rxrpc_call_error_func_t error,
225 rxrpc_call_aemap_func_t aemap,
226 struct rxrpc_call **_call)
228 DECLARE_WAITQUEUE(myself, current);
230 struct rxrpc_call *call;
235 /* allocate and initialise a call record */
236 ret = __rxrpc_create_call(conn, &call);
238 _leave(" = %d", ret);
242 call->app_call_state = RXRPC_CSTATE_CLNT_SND_ARGS;
244 call->app_attn_func = attn;
246 call->app_error_func = error;
248 call->app_aemap_func = aemap;
252 spin_lock(&conn->lock);
253 set_current_state(TASK_INTERRUPTIBLE);
254 add_wait_queue(&conn->chanwait, &myself);
257 /* try to find an unused channel */
258 for (cix = 0; cix < 4; cix++)
259 if (!conn->channels[cix])
262 /* no free channels - wait for one to become available */
264 if (signal_pending(current))
267 spin_unlock(&conn->lock);
270 set_current_state(TASK_INTERRUPTIBLE);
272 spin_lock(&conn->lock);
275 /* got a channel - now attach to the connection */
277 remove_wait_queue(&conn->chanwait, &myself);
278 set_current_state(TASK_RUNNING);
280 /* concoct a unique call number */
282 call->call_id = htonl(++conn->call_counter);
283 for (loop = 0; loop < 4; loop++)
284 if (conn->channels[loop] &&
285 conn->channels[loop]->call_id == call->call_id)
288 rxrpc_get_connection(conn);
289 conn->channels[cix] = call; /* assign _after_ done callid check loop */
290 do_gettimeofday(&conn->atime);
291 call->chan_ix = htonl(cix);
293 spin_unlock(&conn->lock);
295 down_write(&rxrpc_calls_sem);
296 list_add_tail(&call->call_link, &rxrpc_calls);
297 up_write(&rxrpc_calls_sem);
299 __RXACCT(atomic_inc(&rxrpc_call_count));
302 _leave(" = 0 (call=%p cix=%u)", call, cix);
306 remove_wait_queue(&conn->chanwait, &myself);
307 set_current_state(TASK_RUNNING);
308 spin_unlock(&conn->lock);
310 free_page((unsigned long) call);
311 _leave(" = %d", ret);
313 } /* end rxrpc_create_call() */
315 /*****************************************************************************/
317 * create a new call record for incoming calls
319 int rxrpc_incoming_call(struct rxrpc_connection *conn,
320 struct rxrpc_message *msg,
321 struct rxrpc_call **_call)
323 struct rxrpc_call *call;
327 cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK;
329 _enter("%p,%u,%u", conn, ntohl(msg->hdr.callNumber), cix);
331 /* allocate and initialise a call record */
332 ret = __rxrpc_create_call(conn, &call);
334 _leave(" = %d", ret);
338 call->pkt_rcv_count = 1;
339 call->app_call_state = RXRPC_CSTATE_SRVR_RCV_OPID;
340 call->app_mark = sizeof(uint32_t);
344 /* attach to the connection */
346 call->chan_ix = htonl(cix);
347 call->call_id = msg->hdr.callNumber;
349 spin_lock(&conn->lock);
351 if (!conn->channels[cix] ||
352 conn->channels[cix]->app_call_state == RXRPC_CSTATE_COMPLETE ||
353 conn->channels[cix]->app_call_state == RXRPC_CSTATE_ERROR
355 conn->channels[cix] = call;
356 rxrpc_get_connection(conn);
360 spin_unlock(&conn->lock);
363 free_page((unsigned long) call);
368 down_write(&rxrpc_calls_sem);
369 list_add_tail(&call->call_link, &rxrpc_calls);
370 up_write(&rxrpc_calls_sem);
371 __RXACCT(atomic_inc(&rxrpc_call_count));
375 _leave(" = %d [%p]", ret, call);
377 } /* end rxrpc_incoming_call() */
379 /*****************************************************************************/
383 void rxrpc_put_call(struct rxrpc_call *call)
385 struct rxrpc_connection *conn = call->conn;
386 struct rxrpc_message *msg;
388 _enter("%p{u=%d}",call,atomic_read(&call->usage));
391 if (atomic_read(&call->usage) <= 0)
394 /* to prevent a race, the decrement and the de-list must be effectively
396 spin_lock(&conn->lock);
397 if (likely(!atomic_dec_and_test(&call->usage))) {
398 spin_unlock(&conn->lock);
403 if (conn->channels[ntohl(call->chan_ix)] == call)
404 conn->channels[ntohl(call->chan_ix)] = NULL;
406 spin_unlock(&conn->lock);
408 wake_up(&conn->chanwait);
410 rxrpc_put_connection(conn);
412 /* clear the timers and dequeue from krxiod */
413 del_timer_sync(&call->acks_timeout);
414 del_timer_sync(&call->rcv_timeout);
415 del_timer_sync(&call->ackr_dfr_timo);
417 rxrpc_krxiod_dequeue_call(call);
419 /* clean up the contents of the struct */
420 if (call->snd_nextmsg)
421 rxrpc_put_message(call->snd_nextmsg);
424 rxrpc_put_message(call->snd_ping);
426 while (!list_empty(&call->acks_pendq)) {
427 msg = list_entry(call->acks_pendq.next,
428 struct rxrpc_message, link);
429 list_del(&msg->link);
430 rxrpc_put_message(msg);
433 while (!list_empty(&call->rcv_receiveq)) {
434 msg = list_entry(call->rcv_receiveq.next,
435 struct rxrpc_message, link);
436 list_del(&msg->link);
437 rxrpc_put_message(msg);
440 while (!list_empty(&call->app_readyq)) {
441 msg = list_entry(call->app_readyq.next,
442 struct rxrpc_message, link);
443 list_del(&msg->link);
444 rxrpc_put_message(msg);
447 while (!list_empty(&call->app_unreadyq)) {
448 msg = list_entry(call->app_unreadyq.next,
449 struct rxrpc_message, link);
450 list_del(&msg->link);
451 rxrpc_put_message(msg);
454 module_put(call->owner);
456 down_write(&rxrpc_calls_sem);
457 list_del(&call->call_link);
458 up_write(&rxrpc_calls_sem);
460 __RXACCT(atomic_dec(&rxrpc_call_count));
461 free_page((unsigned long) call);
463 _leave(" [destroyed]");
464 } /* end rxrpc_put_call() */
466 /*****************************************************************************/
468 * actually generate a normal ACK
470 static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call *call,
473 struct rxrpc_message *msg;
478 /* ACKs default to DELAY */
479 if (!call->ackr.reason)
480 call->ackr.reason = RXRPC_ACK_DELAY;
482 _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
483 jiffies - call->cjif,
484 ntohs(call->ackr.maxSkew),
485 ntohl(call->ackr.firstPacket),
486 ntohl(call->ackr.previousPacket),
487 ntohl(call->ackr.serial),
488 rxrpc_acks[call->ackr.reason],
491 aux[0] = htonl(call->conn->peer->if_mtu); /* interface MTU */
492 aux[1] = htonl(1444); /* max MTU */
493 aux[2] = htonl(16); /* rwind */
494 aux[3] = htonl(4); /* max packets */
496 diov[0].iov_len = sizeof(struct rxrpc_ackpacket);
497 diov[0].iov_base = &call->ackr;
498 diov[1].iov_len = call->ackr_pend_cnt + 3;
499 diov[1].iov_base = call->ackr_array;
500 diov[2].iov_len = sizeof(aux);
501 diov[2].iov_base = &aux;
503 /* build and send the message */
504 ret = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK,
505 3, diov, GFP_KERNEL, &msg);
510 msg->hdr.seq = htonl(seq);
511 msg->hdr.flags |= RXRPC_SLOW_START_OK;
513 ret = rxrpc_conn_sendmsg(call->conn, msg);
514 rxrpc_put_message(msg);
517 call->pkt_snd_count++;
519 /* count how many actual ACKs there were at the front */
520 for (delta = 0; delta < call->ackr_pend_cnt; delta++)
521 if (call->ackr_array[delta] != RXRPC_ACK_TYPE_ACK)
524 call->ackr_pend_cnt -= delta; /* all ACK'd to this point */
526 /* crank the ACK window around */
528 /* un-ACK'd window */
530 else if (delta < RXRPC_CALL_ACK_WINDOW_SIZE) {
531 /* partially ACK'd window
532 * - shuffle down to avoid losing out-of-sequence packets
534 call->ackr_win_bot += delta;
535 call->ackr_win_top += delta;
537 memmove(&call->ackr_array[0],
538 &call->ackr_array[delta],
539 call->ackr_pend_cnt);
541 memset(&call->ackr_array[call->ackr_pend_cnt],
543 sizeof(call->ackr_array) - call->ackr_pend_cnt);
546 /* fully ACK'd window
547 * - just clear the whole thing
549 memset(&call->ackr_array,
551 sizeof(call->ackr_array));
555 memset(&call->ackr, 0, sizeof(call->ackr));
558 if (!call->app_call_state)
559 printk("___ STATE 0 ___\n");
561 } /* end __rxrpc_call_gen_normal_ACK() */
563 /*****************************************************************************/
565 * note the reception of a packet in the call's ACK records and generate an
566 * appropriate ACK packet if necessary
567 * - returns 0 if packet should be processed, 1 if packet should be ignored
568 * and -ve on an error
570 static int rxrpc_call_generate_ACK(struct rxrpc_call *call,
571 struct rxrpc_header *hdr,
572 struct rxrpc_ackpacket *ack)
574 struct rxrpc_message *msg;
578 u8 special_ACK, do_ACK, force;
580 _enter("%p,%p { seq=%d tp=%d fl=%02x }",
581 call, hdr, ntohl(hdr->seq), hdr->type, hdr->flags);
583 seq = ntohl(hdr->seq);
584 offset = seq - call->ackr_win_bot;
585 do_ACK = RXRPC_ACK_DELAY;
589 if (call->ackr_high_seq < seq)
590 call->ackr_high_seq = seq;
592 /* deal with generation of obvious special ACKs first */
593 if (ack && ack->reason == RXRPC_ACK_PING) {
594 special_ACK = RXRPC_ACK_PING_RESPONSE;
599 if (seq < call->ackr_win_bot) {
600 special_ACK = RXRPC_ACK_DUPLICATE;
605 if (seq >= call->ackr_win_top) {
606 special_ACK = RXRPC_ACK_EXCEEDS_WINDOW;
611 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_NACK) {
612 special_ACK = RXRPC_ACK_DUPLICATE;
617 /* okay... it's a normal data packet inside the ACK window */
618 call->ackr_array[offset] = RXRPC_ACK_TYPE_ACK;
620 if (offset < call->ackr_pend_cnt) {
622 else if (offset > call->ackr_pend_cnt) {
623 do_ACK = RXRPC_ACK_OUT_OF_SEQUENCE;
624 call->ackr_pend_cnt = offset;
628 if (hdr->flags & RXRPC_REQUEST_ACK) {
629 do_ACK = RXRPC_ACK_REQUESTED;
632 /* generate an ACK on the final packet of a reply just received */
633 if (hdr->flags & RXRPC_LAST_PACKET) {
634 if (call->conn->out_clientflag)
637 else if (!(hdr->flags & RXRPC_MORE_PACKETS)) {
638 do_ACK = RXRPC_ACK_REQUESTED;
641 /* re-ACK packets previously received out-of-order */
642 for (offset++; offset < RXRPC_CALL_ACK_WINDOW_SIZE; offset++)
643 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_ACK)
646 call->ackr_pend_cnt = offset;
648 /* generate an ACK if we fill up the window */
649 if (call->ackr_pend_cnt >= RXRPC_CALL_ACK_WINDOW_SIZE)
653 _debug("%05lu ACKs pend=%u norm=%s special=%s%s",
654 jiffies - call->cjif,
657 rxrpc_acks[special_ACK],
658 force ? " immediate" :
659 do_ACK == RXRPC_ACK_REQUESTED ? " merge-req" :
660 hdr->flags & RXRPC_LAST_PACKET ? " finalise" :
664 /* send any pending normal ACKs if need be */
665 if (call->ackr_pend_cnt > 0) {
666 /* fill out the appropriate form */
667 call->ackr.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE);
668 call->ackr.maxSkew = htons(min(call->ackr_high_seq - seq,
670 call->ackr.firstPacket = htonl(call->ackr_win_bot);
671 call->ackr.previousPacket = call->ackr_prev_seq;
672 call->ackr.serial = hdr->serial;
673 call->ackr.nAcks = call->ackr_pend_cnt;
675 if (do_ACK == RXRPC_ACK_REQUESTED)
676 call->ackr.reason = do_ACK;
678 /* generate the ACK immediately if necessary */
679 if (special_ACK || force) {
680 err = __rxrpc_call_gen_normal_ACK(
681 call, do_ACK == RXRPC_ACK_DELAY ? 0 : seq);
689 if (call->ackr.reason == RXRPC_ACK_REQUESTED)
690 call->ackr_dfr_seq = seq;
692 /* start the ACK timer if not running if there are any pending deferred
694 if (call->ackr_pend_cnt > 0 &&
695 call->ackr.reason != RXRPC_ACK_REQUESTED &&
696 !timer_pending(&call->ackr_dfr_timo)
700 timo = rxrpc_call_dfr_ack_timeout + jiffies;
702 _debug("START ACKR TIMER for cj=%lu", timo - call->cjif);
704 spin_lock(&call->lock);
705 mod_timer(&call->ackr_dfr_timo, timo);
706 spin_unlock(&call->lock);
708 else if ((call->ackr_pend_cnt == 0 ||
709 call->ackr.reason == RXRPC_ACK_REQUESTED) &&
710 timer_pending(&call->ackr_dfr_timo)
712 /* stop timer if no pending ACKs */
713 _debug("CLEAR ACKR TIMER");
714 del_timer_sync(&call->ackr_dfr_timo);
717 /* send a special ACK if one is required */
719 struct rxrpc_ackpacket ack;
721 uint8_t acks[1] = { RXRPC_ACK_TYPE_ACK };
723 /* fill out the appropriate form */
724 ack.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE);
725 ack.maxSkew = htons(min(call->ackr_high_seq - seq,
727 ack.firstPacket = htonl(call->ackr_win_bot);
728 ack.previousPacket = call->ackr_prev_seq;
729 ack.serial = hdr->serial;
730 ack.reason = special_ACK;
733 _proto("Rx Sending s-ACK"
734 " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
736 ntohl(ack.firstPacket),
737 ntohl(ack.previousPacket),
739 rxrpc_acks[ack.reason],
742 diov[0].iov_len = sizeof(struct rxrpc_ackpacket);
743 diov[0].iov_base = &ack;
744 diov[1].iov_len = sizeof(acks);
745 diov[1].iov_base = acks;
747 /* build and send the message */
748 err = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK,
749 hdr->seq ? 2 : 1, diov,
758 msg->hdr.seq = htonl(seq);
759 msg->hdr.flags |= RXRPC_SLOW_START_OK;
761 err = rxrpc_conn_sendmsg(call->conn, msg);
762 rxrpc_put_message(msg);
767 call->pkt_snd_count++;
772 call->ackr_prev_seq = hdr->seq;
774 _leave(" = %d", ret);
776 } /* end rxrpc_call_generate_ACK() */
778 /*****************************************************************************/
780 * handle work to be done on a call
781 * - includes packet reception and timeout processing
783 void rxrpc_call_do_stuff(struct rxrpc_call *call)
785 _enter("%p{flags=%lx}", call, call->flags);
787 /* handle packet reception */
788 if (call->flags & RXRPC_CALL_RCV_PKT) {
789 _debug("- receive packet");
790 call->flags &= ~RXRPC_CALL_RCV_PKT;
791 rxrpc_call_receive_packet(call);
794 /* handle overdue ACKs */
795 if (call->flags & RXRPC_CALL_ACKS_TIMO) {
796 _debug("- overdue ACK timeout");
797 call->flags &= ~RXRPC_CALL_ACKS_TIMO;
798 rxrpc_call_resend(call, call->snd_seq_count);
801 /* handle lack of reception */
802 if (call->flags & RXRPC_CALL_RCV_TIMO) {
803 _debug("- reception timeout");
804 call->flags &= ~RXRPC_CALL_RCV_TIMO;
805 rxrpc_call_abort(call, -EIO);
808 /* handle deferred ACKs */
809 if (call->flags & RXRPC_CALL_ACKR_TIMO ||
810 (call->ackr.nAcks > 0 && call->ackr.reason == RXRPC_ACK_REQUESTED)
812 _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u",
813 jiffies - call->cjif,
814 rxrpc_acks[call->ackr.reason],
817 call->flags &= ~RXRPC_CALL_ACKR_TIMO;
819 if (call->ackr.nAcks > 0 &&
820 call->app_call_state != RXRPC_CSTATE_ERROR) {
822 __rxrpc_call_gen_normal_ACK(call, call->ackr_dfr_seq);
823 call->ackr_dfr_seq = 0;
829 } /* end rxrpc_call_do_stuff() */
831 /*****************************************************************************/
833 * send an abort message at call or connection level
834 * - must be called with call->lock held
835 * - the supplied error code is sent as the packet data
837 static int __rxrpc_call_abort(struct rxrpc_call *call, int errno)
839 struct rxrpc_connection *conn = call->conn;
840 struct rxrpc_message *msg;
845 _enter("%p{%08x},%p{%d},%d",
846 conn, ntohl(conn->conn_id), call, ntohl(call->call_id), errno);
848 /* if this call is already aborted, then just wake up any waiters */
849 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
850 spin_unlock(&call->lock);
851 call->app_error_func(call);
856 rxrpc_get_call(call);
858 /* change the state _with_ the lock still held */
859 call->app_call_state = RXRPC_CSTATE_ERROR;
860 call->app_err_state = RXRPC_ESTATE_LOCAL_ABORT;
861 call->app_errno = errno;
862 call->app_mark = RXRPC_APP_MARK_EOF;
863 call->app_read_buf = NULL;
864 call->app_async_read = 0;
868 /* ask the app to translate the error code */
869 call->app_aemap_func(call);
871 spin_unlock(&call->lock);
873 /* flush any outstanding ACKs */
874 del_timer_sync(&call->acks_timeout);
875 del_timer_sync(&call->rcv_timeout);
876 del_timer_sync(&call->ackr_dfr_timo);
878 if (rxrpc_call_is_ack_pending(call))
879 __rxrpc_call_gen_normal_ACK(call, 0);
881 /* send the abort packet only if we actually traded some other
884 if (call->pkt_snd_count || call->pkt_rcv_count) {
885 /* actually send the abort */
886 _proto("Rx Sending Call ABORT { data=%d }",
887 call->app_abort_code);
889 _error = htonl(call->app_abort_code);
891 diov[0].iov_len = sizeof(_error);
892 diov[0].iov_base = &_error;
894 ret = rxrpc_conn_newmsg(conn, call, RXRPC_PACKET_TYPE_ABORT,
895 1, diov, GFP_KERNEL, &msg);
897 ret = rxrpc_conn_sendmsg(conn, msg);
898 rxrpc_put_message(msg);
902 /* tell the app layer to let go */
903 call->app_error_func(call);
905 rxrpc_put_call(call);
907 _leave(" = %d", ret);
909 } /* end __rxrpc_call_abort() */
911 /*****************************************************************************/
913 * send an abort message at call or connection level
914 * - the supplied error code is sent as the packet data
916 int rxrpc_call_abort(struct rxrpc_call *call, int error)
918 spin_lock(&call->lock);
920 return __rxrpc_call_abort(call, error);
922 } /* end rxrpc_call_abort() */
924 /*****************************************************************************/
926 * process packets waiting for this call
928 static void rxrpc_call_receive_packet(struct rxrpc_call *call)
930 struct rxrpc_message *msg;
931 struct list_head *_p;
935 rxrpc_get_call(call); /* must not go away too soon if aborted by
938 while (!list_empty(&call->rcv_receiveq)) {
939 /* try to get next packet */
941 spin_lock(&call->lock);
942 if (!list_empty(&call->rcv_receiveq)) {
943 _p = call->rcv_receiveq.next;
946 spin_unlock(&call->lock);
951 msg = list_entry(_p, struct rxrpc_message, link);
953 _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)",
954 jiffies - call->cjif,
955 rxrpc_pkts[msg->hdr.type],
956 ntohl(msg->hdr.serial),
958 msg->hdr.flags & RXRPC_JUMBO_PACKET ? 'j' : '-',
959 msg->hdr.flags & RXRPC_MORE_PACKETS ? 'm' : '-',
960 msg->hdr.flags & RXRPC_LAST_PACKET ? 'l' : '-',
961 msg->hdr.flags & RXRPC_REQUEST_ACK ? 'r' : '-',
962 msg->hdr.flags & RXRPC_CLIENT_INITIATED ? 'C' : 'S'
965 switch (msg->hdr.type) {
966 /* deal with data packets */
967 case RXRPC_PACKET_TYPE_DATA:
968 /* ACK the packet if necessary */
969 switch (rxrpc_call_generate_ACK(call, &msg->hdr,
971 case 0: /* useful packet */
972 rxrpc_call_receive_data_packet(call, msg);
974 case 1: /* duplicate or out-of-window packet */
977 rxrpc_put_message(msg);
982 /* deal with ACK packets */
983 case RXRPC_PACKET_TYPE_ACK:
984 rxrpc_call_receive_ack_packet(call, msg);
987 /* deal with abort packets */
988 case RXRPC_PACKET_TYPE_ABORT: {
991 dp = skb_header_pointer(msg->pkt, msg->offset,
992 sizeof(_dbuf), &_dbuf);
994 printk("Rx Received short ABORT packet\n");
996 _proto("Rx Received Call ABORT { data=%d }",
997 (dp ? ntohl(*dp) : 0));
999 spin_lock(&call->lock);
1000 call->app_call_state = RXRPC_CSTATE_ERROR;
1001 call->app_err_state = RXRPC_ESTATE_PEER_ABORT;
1002 call->app_abort_code = (dp ? ntohl(*dp) : 0);
1003 call->app_errno = -ECONNABORTED;
1004 call->app_mark = RXRPC_APP_MARK_EOF;
1005 call->app_read_buf = NULL;
1006 call->app_async_read = 0;
1008 /* ask the app to translate the error code */
1009 call->app_aemap_func(call);
1011 spin_unlock(&call->lock);
1012 call->app_error_func(call);
1016 /* deal with other packet types */
1017 _proto("Rx Unsupported packet type %u (#%u)",
1018 msg->hdr.type, msg->seq);
1022 rxrpc_put_message(msg);
1026 rxrpc_put_call(call);
1028 } /* end rxrpc_call_receive_packet() */
1030 /*****************************************************************************/
1032 * process next data packet
1033 * - as the next data packet arrives:
1034 * - it is queued on app_readyq _if_ it is the next one expected
1036 * - it is queued on app_unreadyq _if_ it is not the next one expected
1037 * - if a packet placed on app_readyq completely fills a hole leading up to
1038 * the first packet on app_unreadyq, then packets now in sequence are
1039 * tranferred to app_readyq
1040 * - the application layer can only see packets on app_readyq
1041 * (app_ready_qty bytes)
1042 * - the application layer is prodded every time a new packet arrives
1044 static void rxrpc_call_receive_data_packet(struct rxrpc_call *call,
1045 struct rxrpc_message *msg)
1047 const struct rxrpc_operation *optbl, *op;
1048 struct rxrpc_message *pmsg;
1049 struct list_head *_p;
1050 int ret, lo, hi, rmtimo;
1053 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq);
1055 rxrpc_get_message(msg);
1057 /* add to the unready queue if we'd have to create a hole in the ready
1058 * queue otherwise */
1059 if (msg->seq != call->app_ready_seq + 1) {
1060 _debug("Call add packet %d to unreadyq", msg->seq);
1062 /* insert in seq order */
1063 list_for_each(_p, &call->app_unreadyq) {
1064 pmsg = list_entry(_p, struct rxrpc_message, link);
1065 if (pmsg->seq > msg->seq)
1069 list_add_tail(&msg->link, _p);
1071 _leave(" [unreadyq]");
1075 /* next in sequence - simply append into the call's ready queue */
1076 _debug("Call add packet %d to readyq (+%Zd => %Zd bytes)",
1077 msg->seq, msg->dsize, call->app_ready_qty);
1079 spin_lock(&call->lock);
1080 call->app_ready_seq = msg->seq;
1081 call->app_ready_qty += msg->dsize;
1082 list_add_tail(&msg->link, &call->app_readyq);
1084 /* move unready packets to the readyq if we got rid of a hole */
1085 while (!list_empty(&call->app_unreadyq)) {
1086 pmsg = list_entry(call->app_unreadyq.next,
1087 struct rxrpc_message, link);
1089 if (pmsg->seq != call->app_ready_seq + 1)
1092 /* next in sequence - just move list-to-list */
1093 _debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)",
1094 pmsg->seq, pmsg->dsize, call->app_ready_qty);
1096 call->app_ready_seq = pmsg->seq;
1097 call->app_ready_qty += pmsg->dsize;
1098 list_del_init(&pmsg->link);
1099 list_add_tail(&pmsg->link, &call->app_readyq);
1102 /* see if we've got the last packet yet */
1103 if (!list_empty(&call->app_readyq)) {
1104 pmsg = list_entry(call->app_readyq.prev,
1105 struct rxrpc_message, link);
1106 if (pmsg->hdr.flags & RXRPC_LAST_PACKET) {
1107 call->app_last_rcv = 1;
1108 _debug("Last packet on readyq");
1112 switch (call->app_call_state) {
1113 /* do nothing if call already aborted */
1114 case RXRPC_CSTATE_ERROR:
1115 spin_unlock(&call->lock);
1119 /* extract the operation ID from an incoming call if that's not
1121 case RXRPC_CSTATE_SRVR_RCV_OPID:
1122 spin_unlock(&call->lock);
1124 /* handle as yet insufficient data for the operation ID */
1125 if (call->app_ready_qty < 4) {
1126 if (call->app_last_rcv)
1127 /* trouble - last packet seen */
1128 rxrpc_call_abort(call, -EINVAL);
1134 /* pull the operation ID out of the buffer */
1135 ret = rxrpc_call_read_data(call, &opid, sizeof(opid), 0);
1137 printk("Unexpected error from read-data: %d\n", ret);
1138 if (call->app_call_state != RXRPC_CSTATE_ERROR)
1139 rxrpc_call_abort(call, ret);
1143 call->app_opcode = ntohl(opid);
1145 /* locate the operation in the available ops table */
1146 optbl = call->conn->service->ops_begin;
1148 hi = call->conn->service->ops_end - optbl;
1151 int mid = (hi + lo) / 2;
1153 if (call->app_opcode == op->id)
1155 if (call->app_opcode > op->id)
1162 kproto("Rx Client requested operation %d from %s service",
1163 call->app_opcode, call->conn->service->name);
1164 rxrpc_call_abort(call, -EINVAL);
1169 _proto("Rx Client requested operation %s from %s service",
1170 op->name, call->conn->service->name);
1172 /* we're now waiting for the argument block (unless the call
1174 spin_lock(&call->lock);
1175 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_OPID ||
1176 call->app_call_state == RXRPC_CSTATE_SRVR_SND_REPLY) {
1177 if (!call->app_last_rcv)
1178 call->app_call_state =
1179 RXRPC_CSTATE_SRVR_RCV_ARGS;
1180 else if (call->app_ready_qty > 0)
1181 call->app_call_state =
1182 RXRPC_CSTATE_SRVR_GOT_ARGS;
1184 call->app_call_state =
1185 RXRPC_CSTATE_SRVR_SND_REPLY;
1186 call->app_mark = op->asize;
1187 call->app_user = op->user;
1189 spin_unlock(&call->lock);
1194 case RXRPC_CSTATE_SRVR_RCV_ARGS:
1195 /* change state if just received last packet of arg block */
1196 if (call->app_last_rcv)
1197 call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS;
1198 spin_unlock(&call->lock);
1203 case RXRPC_CSTATE_CLNT_RCV_REPLY:
1204 /* change state if just received last packet of reply block */
1206 if (call->app_last_rcv) {
1207 call->app_call_state = RXRPC_CSTATE_CLNT_GOT_REPLY;
1210 spin_unlock(&call->lock);
1213 del_timer_sync(&call->acks_timeout);
1214 del_timer_sync(&call->rcv_timeout);
1215 del_timer_sync(&call->ackr_dfr_timo);
1222 /* deal with data reception in an unexpected state */
1223 printk("Unexpected state [[[ %u ]]]\n", call->app_call_state);
1224 __rxrpc_call_abort(call, -EBADMSG);
1229 if (call->app_call_state == RXRPC_CSTATE_CLNT_RCV_REPLY &&
1233 /* otherwise just invoke the data function whenever we can satisfy its desire for more
1236 _proto("Rx Received Op Data: st=%u qty=%Zu mk=%Zu%s",
1237 call->app_call_state, call->app_ready_qty, call->app_mark,
1238 call->app_last_rcv ? " last-rcvd" : "");
1240 spin_lock(&call->lock);
1242 ret = __rxrpc_call_read_data(call);
1245 spin_unlock(&call->lock);
1246 call->app_attn_func(call);
1249 spin_unlock(&call->lock);
1252 spin_unlock(&call->lock);
1255 __rxrpc_call_abort(call, ret);
1263 } /* end rxrpc_call_receive_data_packet() */
1265 /*****************************************************************************/
1267 * received an ACK packet
1269 static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call,
1270 struct rxrpc_message *msg)
1272 struct rxrpc_ackpacket _ack, *ap;
1273 rxrpc_serial_net_t serial;
1277 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq);
1279 /* extract the basic ACK record */
1280 ap = skb_header_pointer(msg->pkt, msg->offset, sizeof(_ack), &_ack);
1282 printk("Rx Received short ACK packet\n");
1285 msg->offset += sizeof(_ack);
1287 serial = ap->serial;
1288 seq = ntohl(ap->firstPacket);
1290 _proto("Rx Received ACK %%%d { b=%hu m=%hu f=%u p=%u s=%u r=%s n=%u }",
1291 ntohl(msg->hdr.serial),
1292 ntohs(ap->bufferSpace),
1295 ntohl(ap->previousPacket),
1297 rxrpc_acks[ap->reason],
1301 /* check the other side isn't ACK'ing a sequence number I haven't sent
1303 if (ap->nAcks > 0 &&
1304 (seq > call->snd_seq_count ||
1305 seq + ap->nAcks - 1 > call->snd_seq_count)) {
1306 printk("Received ACK (#%u-#%u) for unsent packet\n",
1307 seq, seq + ap->nAcks - 1);
1308 rxrpc_call_abort(call, -EINVAL);
1313 /* deal with RTT calculation */
1315 struct rxrpc_message *rttmsg;
1317 /* find the prompting packet */
1318 spin_lock(&call->lock);
1319 if (call->snd_ping && call->snd_ping->hdr.serial == serial) {
1320 /* it was a ping packet */
1321 rttmsg = call->snd_ping;
1322 call->snd_ping = NULL;
1323 spin_unlock(&call->lock);
1326 rttmsg->rttdone = 1;
1327 rxrpc_peer_calculate_rtt(call->conn->peer,
1329 rxrpc_put_message(rttmsg);
1333 struct list_head *_p;
1335 /* it ought to be a data packet - look in the pending
1337 list_for_each(_p, &call->acks_pendq) {
1338 rttmsg = list_entry(_p, struct rxrpc_message,
1340 if (rttmsg->hdr.serial == serial) {
1341 if (rttmsg->rttdone)
1342 /* never do RTT twice without
1346 rttmsg->rttdone = 1;
1347 rxrpc_peer_calculate_rtt(
1348 call->conn->peer, rttmsg, msg);
1352 spin_unlock(&call->lock);
1356 switch (ap->reason) {
1357 /* deal with negative/positive acknowledgement of data
1359 case RXRPC_ACK_REQUESTED:
1360 case RXRPC_ACK_DELAY:
1361 case RXRPC_ACK_IDLE:
1362 rxrpc_call_definitively_ACK(call, seq - 1);
1364 case RXRPC_ACK_DUPLICATE:
1365 case RXRPC_ACK_OUT_OF_SEQUENCE:
1366 case RXRPC_ACK_EXCEEDS_WINDOW:
1367 call->snd_resend_cnt = 0;
1368 ret = rxrpc_call_record_ACK(call, msg, seq, ap->nAcks);
1370 rxrpc_call_abort(call, ret);
1373 /* respond to ping packets immediately */
1374 case RXRPC_ACK_PING:
1375 rxrpc_call_generate_ACK(call, &msg->hdr, ap);
1378 /* only record RTT on ping response packets */
1379 case RXRPC_ACK_PING_RESPONSE:
1380 if (call->snd_ping) {
1381 struct rxrpc_message *rttmsg;
1383 /* only do RTT stuff if the response matches the
1386 spin_lock(&call->lock);
1387 if (call->snd_ping &&
1388 call->snd_ping->hdr.serial == ap->serial) {
1389 rttmsg = call->snd_ping;
1390 call->snd_ping = NULL;
1392 spin_unlock(&call->lock);
1395 rttmsg->rttdone = 1;
1396 rxrpc_peer_calculate_rtt(call->conn->peer,
1398 rxrpc_put_message(rttmsg);
1404 printk("Unsupported ACK reason %u\n", ap->reason);
1409 } /* end rxrpc_call_receive_ack_packet() */
1411 /*****************************************************************************/
1413 * record definitive ACKs for all messages up to and including the one with the
1416 static void rxrpc_call_definitively_ACK(struct rxrpc_call *call,
1417 rxrpc_seq_t highest)
1419 struct rxrpc_message *msg;
1422 _enter("%p{ads=%u},%u", call, call->acks_dftv_seq, highest);
1424 while (call->acks_dftv_seq < highest) {
1425 call->acks_dftv_seq++;
1427 _proto("Definitive ACK on packet #%u", call->acks_dftv_seq);
1429 /* discard those at front of queue until message with highest
1431 spin_lock(&call->lock);
1433 if (!list_empty(&call->acks_pendq)) {
1434 msg = list_entry(call->acks_pendq.next,
1435 struct rxrpc_message, link);
1436 list_del_init(&msg->link); /* dequeue */
1437 if (msg->state == RXRPC_MSG_SENT)
1438 call->acks_pend_cnt--;
1440 spin_unlock(&call->lock);
1442 /* insanity check */
1444 panic("%s(): acks_pendq unexpectedly empty\n",
1447 if (msg->seq != call->acks_dftv_seq)
1448 panic("%s(): Packet #%u expected at front of acks_pendq"
1450 __FUNCTION__, call->acks_dftv_seq, msg->seq);
1452 /* discard the message */
1453 msg->state = RXRPC_MSG_DONE;
1454 rxrpc_put_message(msg);
1457 /* if all sent packets are definitively ACK'd then prod any sleepers just in case */
1459 spin_lock(&call->lock);
1460 if (call->acks_dftv_seq == call->snd_seq_count) {
1461 if (call->app_call_state != RXRPC_CSTATE_COMPLETE) {
1462 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1467 spin_unlock(&call->lock);
1470 del_timer_sync(&call->acks_timeout);
1471 del_timer_sync(&call->rcv_timeout);
1472 del_timer_sync(&call->ackr_dfr_timo);
1473 call->app_attn_func(call);
1477 } /* end rxrpc_call_definitively_ACK() */
1479 /*****************************************************************************/
1481 * record the specified amount of ACKs/NAKs
1483 static int rxrpc_call_record_ACK(struct rxrpc_call *call,
1484 struct rxrpc_message *msg,
1488 struct rxrpc_message *dmsg;
1489 struct list_head *_p;
1490 rxrpc_seq_t highest;
1493 char resend, now_complete;
1496 _enter("%p{apc=%u ads=%u},%p,%u,%Zu",
1497 call, call->acks_pend_cnt, call->acks_dftv_seq,
1500 /* handle re-ACK'ing of definitively ACK'd packets (may be out-of-order
1502 if (seq <= call->acks_dftv_seq) {
1503 unsigned delta = call->acks_dftv_seq - seq;
1505 if (count <= delta) {
1506 _leave(" = 0 [all definitively ACK'd]");
1512 msg->offset += delta;
1515 highest = seq + count - 1;
1518 /* extract up to 16 ACK slots at a time */
1519 chunk = min(count, sizeof(acks));
1522 memset(acks, 2, sizeof(acks));
1524 if (skb_copy_bits(msg->pkt, msg->offset, &acks, chunk) < 0) {
1525 printk("Rx Received short ACK packet\n");
1526 _leave(" = -EINVAL");
1529 msg->offset += chunk;
1531 /* check that the ACK set is valid */
1532 for (ix = 0; ix < chunk; ix++) {
1534 case RXRPC_ACK_TYPE_ACK:
1536 case RXRPC_ACK_TYPE_NACK:
1540 printk("Rx Received unsupported ACK state"
1542 _leave(" = -EINVAL");
1547 _proto("Rx ACK of packets #%u-#%u "
1548 "[%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c] (pend=%u)",
1549 seq, (unsigned) (seq + chunk - 1),
1550 _acktype[acks[0x0]],
1551 _acktype[acks[0x1]],
1552 _acktype[acks[0x2]],
1553 _acktype[acks[0x3]],
1554 _acktype[acks[0x4]],
1555 _acktype[acks[0x5]],
1556 _acktype[acks[0x6]],
1557 _acktype[acks[0x7]],
1558 _acktype[acks[0x8]],
1559 _acktype[acks[0x9]],
1560 _acktype[acks[0xA]],
1561 _acktype[acks[0xB]],
1562 _acktype[acks[0xC]],
1563 _acktype[acks[0xD]],
1564 _acktype[acks[0xE]],
1565 _acktype[acks[0xF]],
1569 /* mark the packets in the ACK queue as being provisionally
1572 spin_lock(&call->lock);
1574 /* find the first packet ACK'd/NAK'd here */
1575 list_for_each(_p, &call->acks_pendq) {
1576 dmsg = list_entry(_p, struct rxrpc_message, link);
1577 if (dmsg->seq == seq)
1579 _debug("- %u: skipping #%u", ix, dmsg->seq);
1585 _debug("- %u: processing #%u (%c) apc=%u",
1586 ix, dmsg->seq, _acktype[acks[ix]],
1587 call->acks_pend_cnt);
1589 if (acks[ix] == RXRPC_ACK_TYPE_ACK) {
1590 if (dmsg->state == RXRPC_MSG_SENT)
1591 call->acks_pend_cnt--;
1592 dmsg->state = RXRPC_MSG_ACKED;
1595 if (dmsg->state == RXRPC_MSG_ACKED)
1596 call->acks_pend_cnt++;
1597 dmsg->state = RXRPC_MSG_SENT;
1602 _p = dmsg->link.next;
1603 dmsg = list_entry(_p, struct rxrpc_message, link);
1604 } while(ix < chunk &&
1605 _p != &call->acks_pendq &&
1611 spin_unlock(&call->lock);
1615 rxrpc_call_resend(call, highest);
1617 /* if all packets are provisionally ACK'd, then wake up anyone who's
1618 * waiting for that */
1620 spin_lock(&call->lock);
1621 if (call->acks_pend_cnt == 0) {
1622 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_FINAL_ACK) {
1623 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1628 spin_unlock(&call->lock);
1631 _debug("- wake up waiters");
1632 del_timer_sync(&call->acks_timeout);
1633 del_timer_sync(&call->rcv_timeout);
1634 del_timer_sync(&call->ackr_dfr_timo);
1635 call->app_attn_func(call);
1638 _leave(" = 0 (apc=%u)", call->acks_pend_cnt);
1642 panic("%s(): acks_pendq in bad state (packet #%u absent)\n",
1645 } /* end rxrpc_call_record_ACK() */
1647 /*****************************************************************************/
1649 * transfer data from the ready packet queue to the asynchronous read buffer
1650 * - since this func is the only one going to look at packets queued on
1651 * app_readyq, we don't need a lock to modify or access them, only to modify
1652 * the queue pointers
1653 * - called with call->lock held
1654 * - the buffer must be in kernel space
1656 * 0 if buffer filled
1657 * -EAGAIN if buffer not filled and more data to come
1658 * -EBADMSG if last packet received and insufficient data left
1659 * -ECONNABORTED if the call has in an error state
1661 static int __rxrpc_call_read_data(struct rxrpc_call *call)
1663 struct rxrpc_message *msg;
1667 _enter("%p{as=%d buf=%p qty=%Zu/%Zu}",
1669 call->app_async_read, call->app_read_buf,
1670 call->app_ready_qty, call->app_mark);
1672 /* check the state */
1673 switch (call->app_call_state) {
1674 case RXRPC_CSTATE_SRVR_RCV_ARGS:
1675 case RXRPC_CSTATE_CLNT_RCV_REPLY:
1676 if (call->app_last_rcv) {
1677 printk("%s(%p,%p,%Zd):"
1678 " Inconsistent call state (%s, last pkt)",
1680 call, call->app_read_buf, call->app_mark,
1681 rxrpc_call_states[call->app_call_state]);
1686 case RXRPC_CSTATE_SRVR_RCV_OPID:
1687 case RXRPC_CSTATE_SRVR_GOT_ARGS:
1688 case RXRPC_CSTATE_CLNT_GOT_REPLY:
1691 case RXRPC_CSTATE_SRVR_SND_REPLY:
1692 if (!call->app_last_rcv) {
1693 printk("%s(%p,%p,%Zd):"
1694 " Inconsistent call state (%s, not last pkt)",
1696 call, call->app_read_buf, call->app_mark,
1697 rxrpc_call_states[call->app_call_state]);
1700 _debug("Trying to read data from call in SND_REPLY state");
1703 case RXRPC_CSTATE_ERROR:
1704 _leave(" = -ECONNABORTED");
1705 return -ECONNABORTED;
1708 printk("reading in unexpected state [[[ %u ]]]\n",
1709 call->app_call_state);
1713 /* handle the case of not having an async buffer */
1714 if (!call->app_async_read) {
1715 if (call->app_mark == RXRPC_APP_MARK_EOF) {
1716 ret = call->app_last_rcv ? 0 : -EAGAIN;
1719 if (call->app_mark >= call->app_ready_qty) {
1720 call->app_mark = RXRPC_APP_MARK_EOF;
1724 ret = call->app_last_rcv ? -EBADMSG : -EAGAIN;
1728 _leave(" = %d [no buf]", ret);
1732 while (!list_empty(&call->app_readyq) && call->app_mark > 0) {
1733 msg = list_entry(call->app_readyq.next,
1734 struct rxrpc_message, link);
1736 /* drag as much data as we need out of this packet */
1737 qty = min(call->app_mark, msg->dsize);
1739 _debug("reading %Zu from skb=%p off=%lu",
1740 qty, msg->pkt, msg->offset);
1742 if (call->app_read_buf)
1743 if (skb_copy_bits(msg->pkt, msg->offset,
1744 call->app_read_buf, qty) < 0)
1745 panic("%s: Failed to copy data from packet:"
1748 call, call->app_read_buf, qty);
1750 /* if that packet is now empty, discard it */
1751 call->app_ready_qty -= qty;
1754 if (msg->dsize == 0) {
1755 list_del_init(&msg->link);
1756 rxrpc_put_message(msg);
1762 call->app_mark -= qty;
1763 if (call->app_read_buf)
1764 call->app_read_buf += qty;
1767 if (call->app_mark == 0) {
1768 call->app_async_read = 0;
1769 call->app_mark = RXRPC_APP_MARK_EOF;
1770 call->app_read_buf = NULL;
1772 /* adjust the state if used up all packets */
1773 if (list_empty(&call->app_readyq) && call->app_last_rcv) {
1774 switch (call->app_call_state) {
1775 case RXRPC_CSTATE_SRVR_RCV_OPID:
1776 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY;
1777 call->app_mark = RXRPC_APP_MARK_EOF;
1779 del_timer_sync(&call->rcv_timeout);
1781 case RXRPC_CSTATE_SRVR_GOT_ARGS:
1782 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY;
1784 del_timer_sync(&call->rcv_timeout);
1787 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1789 del_timer_sync(&call->acks_timeout);
1790 del_timer_sync(&call->ackr_dfr_timo);
1791 del_timer_sync(&call->rcv_timeout);
1800 if (call->app_last_rcv) {
1801 _debug("Insufficient data (%Zu/%Zu)",
1802 call->app_ready_qty, call->app_mark);
1803 call->app_async_read = 0;
1804 call->app_mark = RXRPC_APP_MARK_EOF;
1805 call->app_read_buf = NULL;
1807 _leave(" = -EBADMSG");
1811 _leave(" = -EAGAIN");
1813 } /* end __rxrpc_call_read_data() */
1815 /*****************************************************************************/
1817 * attempt to read the specified amount of data from the call's ready queue
1818 * into the buffer provided
1819 * - since this func is the only one going to look at packets queued on
1820 * app_readyq, we don't need a lock to modify or access them, only to modify
1821 * the queue pointers
1822 * - if the buffer pointer is NULL, then data is merely drained, not copied
1823 * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is
1824 * enough data or an error will be generated
1825 * - note that the caller must have added the calling task to the call's wait
1827 * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this
1828 * function doesn't read all available data
1830 int rxrpc_call_read_data(struct rxrpc_call *call,
1831 void *buffer, size_t size, int flags)
1835 _enter("%p{arq=%Zu},%p,%Zd,%x",
1836 call, call->app_ready_qty, buffer, size, flags);
1838 spin_lock(&call->lock);
1840 if (unlikely(!!call->app_read_buf)) {
1841 spin_unlock(&call->lock);
1842 _leave(" = -EBUSY");
1846 call->app_mark = size;
1847 call->app_read_buf = buffer;
1848 call->app_async_read = 1;
1849 call->app_read_count++;
1851 /* read as much data as possible */
1852 ret = __rxrpc_call_read_data(call);
1855 if (flags & RXRPC_CALL_READ_ALL &&
1856 (!call->app_last_rcv || call->app_ready_qty > 0)) {
1857 _leave(" = -EBADMSG");
1858 __rxrpc_call_abort(call, -EBADMSG);
1862 spin_unlock(&call->lock);
1863 call->app_attn_func(call);
1868 spin_unlock(&call->lock);
1869 _leave(" = %d [aborted]", ret);
1873 __rxrpc_call_abort(call, ret);
1874 _leave(" = %d", ret);
1878 spin_unlock(&call->lock);
1880 if (!(flags & RXRPC_CALL_READ_BLOCK)) {
1881 _leave(" = -EAGAIN");
1885 /* wait for the data to arrive */
1886 _debug("blocking for data arrival");
1889 set_current_state(TASK_INTERRUPTIBLE);
1890 if (!call->app_async_read || signal_pending(current))
1894 set_current_state(TASK_RUNNING);
1896 if (signal_pending(current)) {
1897 _leave(" = -EINTR");
1901 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
1902 _leave(" = -ECONNABORTED");
1903 return -ECONNABORTED;
1910 } /* end rxrpc_call_read_data() */
1912 /*****************************************************************************/
1914 * write data to a call
1915 * - the data may not be sent immediately if it doesn't fill a buffer
1916 * - if we can't queue all the data for buffering now, siov[] will have been
1917 * adjusted to take account of what has been sent
1919 int rxrpc_call_write_data(struct rxrpc_call *call,
1927 struct rxrpc_message *msg;
1929 size_t space, size, chunk, tmp;
1933 _enter("%p,%Zu,%p,%02x,%x,%d,%p",
1934 call, sioc, siov, rxhdr_flags, alloc_flags, dup_data,
1941 /* can't send more if we've sent last packet from this end */
1942 switch (call->app_call_state) {
1943 case RXRPC_CSTATE_SRVR_SND_REPLY:
1944 case RXRPC_CSTATE_CLNT_SND_ARGS:
1946 case RXRPC_CSTATE_ERROR:
1947 ret = call->app_errno;
1952 /* calculate how much data we've been given */
1954 for (; sioc > 0; sptr++, sioc--) {
1958 if (!sptr->iov_base)
1961 size += sptr->iov_len;
1964 _debug("- size=%Zu mtu=%Zu", size, call->conn->mtu_size);
1967 /* make sure there's a message under construction */
1968 if (!call->snd_nextmsg) {
1969 /* no - allocate a message with no data yet attached */
1970 ret = rxrpc_conn_newmsg(call->conn, call,
1971 RXRPC_PACKET_TYPE_DATA,
1972 0, NULL, alloc_flags,
1973 &call->snd_nextmsg);
1976 _debug("- allocated new message [ds=%Zu]",
1977 call->snd_nextmsg->dsize);
1980 msg = call->snd_nextmsg;
1981 msg->hdr.flags |= rxhdr_flags;
1983 /* deal with zero-length terminal packet */
1985 if (rxhdr_flags & RXRPC_LAST_PACKET) {
1986 ret = rxrpc_call_flush(call);
1993 /* work out how much space current packet has available */
1994 space = call->conn->mtu_size - msg->dsize;
1995 chunk = min(space, size);
1997 _debug("- [before] space=%Zu chunk=%Zu", space, chunk);
1999 while (!siov->iov_len)
2002 /* if we are going to have to duplicate the data then coalesce
2005 /* don't allocate more that 1 page at a time */
2006 if (chunk > PAGE_SIZE)
2009 /* allocate a data buffer and attach to the message */
2010 buf = kmalloc(chunk, alloc_flags);
2011 if (unlikely(!buf)) {
2013 sizeof(struct rxrpc_header)) {
2014 /* discard an empty msg and wind back
2015 * the seq counter */
2016 rxrpc_put_message(msg);
2017 call->snd_nextmsg = NULL;
2018 call->snd_seq_count--;
2025 tmp = msg->dcount++;
2026 set_bit(tmp, &msg->dfree);
2027 msg->data[tmp].iov_base = buf;
2028 msg->data[tmp].iov_len = chunk;
2029 msg->dsize += chunk;
2030 *size_sent += chunk;
2033 /* load the buffer with data */
2035 tmp = min(chunk, siov->iov_len);
2036 memcpy(buf, siov->iov_base, tmp);
2038 siov->iov_base += tmp;
2039 siov->iov_len -= tmp;
2046 /* we want to attach the supplied buffers directly */
2048 msg->dcount < RXRPC_MSG_MAX_IOCS) {
2049 tmp = msg->dcount++;
2050 msg->data[tmp].iov_base = siov->iov_base;
2051 msg->data[tmp].iov_len = siov->iov_len;
2052 msg->dsize += siov->iov_len;
2053 *size_sent += siov->iov_len;
2054 size -= siov->iov_len;
2055 chunk -= siov->iov_len;
2060 _debug("- [loaded] chunk=%Zu size=%Zu", chunk, size);
2062 /* dispatch the message when full, final or requesting ACK */
2063 if (msg->dsize >= call->conn->mtu_size || rxhdr_flags) {
2064 ret = rxrpc_call_flush(call);
2073 _leave(" = %d (%Zd queued, %Zd rem)", ret, *size_sent, size);
2076 } /* end rxrpc_call_write_data() */
2078 /*****************************************************************************/
2080 * flush outstanding packets to the network
2082 int rxrpc_call_flush(struct rxrpc_call *call)
2084 struct rxrpc_message *msg;
2089 rxrpc_get_call(call);
2091 /* if there's a packet under construction, then dispatch it now */
2092 if (call->snd_nextmsg) {
2093 msg = call->snd_nextmsg;
2094 call->snd_nextmsg = NULL;
2096 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2097 msg->hdr.flags &= ~RXRPC_MORE_PACKETS;
2098 if (call->app_call_state != RXRPC_CSTATE_CLNT_SND_ARGS)
2099 msg->hdr.flags |= RXRPC_REQUEST_ACK;
2102 msg->hdr.flags |= RXRPC_MORE_PACKETS;
2105 _proto("Sending DATA message { ds=%Zu dc=%u df=%02lu }",
2106 msg->dsize, msg->dcount, msg->dfree);
2108 /* queue and adjust call state */
2109 spin_lock(&call->lock);
2110 list_add_tail(&msg->link, &call->acks_pendq);
2112 /* decide what to do depending on current state and if this is
2113 * the last packet */
2115 switch (call->app_call_state) {
2116 case RXRPC_CSTATE_SRVR_SND_REPLY:
2117 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2118 call->app_call_state =
2119 RXRPC_CSTATE_SRVR_RCV_FINAL_ACK;
2124 case RXRPC_CSTATE_CLNT_SND_ARGS:
2125 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2126 call->app_call_state =
2127 RXRPC_CSTATE_CLNT_RCV_REPLY;
2132 case RXRPC_CSTATE_ERROR:
2133 ret = call->app_errno;
2135 spin_unlock(&call->lock);
2139 call->acks_pend_cnt++;
2141 mod_timer(&call->acks_timeout,
2142 __rxrpc_rtt_based_timeout(call,
2143 rxrpc_call_acks_timeout));
2145 spin_unlock(&call->lock);
2147 ret = rxrpc_conn_sendmsg(call->conn, msg);
2149 call->pkt_snd_count++;
2153 rxrpc_put_call(call);
2155 _leave(" = %d", ret);
2158 } /* end rxrpc_call_flush() */
2160 /*****************************************************************************/
2162 * resend NAK'd or unacknowledged packets up to the highest one specified
2164 static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest)
2166 struct rxrpc_message *msg;
2167 struct list_head *_p;
2168 rxrpc_seq_t seq = 0;
2170 _enter("%p,%u", call, highest);
2172 _proto("Rx Resend required");
2174 /* handle too many resends */
2175 if (call->snd_resend_cnt >= rxrpc_call_max_resend) {
2176 _debug("Aborting due to too many resends (rcv=%d)",
2177 call->pkt_rcv_count);
2178 rxrpc_call_abort(call,
2179 call->pkt_rcv_count > 0 ? -EIO : -ETIMEDOUT);
2184 spin_lock(&call->lock);
2185 call->snd_resend_cnt++;
2187 /* determine which the next packet we might need to ACK is */
2188 if (seq <= call->acks_dftv_seq)
2189 seq = call->acks_dftv_seq;
2195 /* look for the packet in the pending-ACK queue */
2196 list_for_each(_p, &call->acks_pendq) {
2197 msg = list_entry(_p, struct rxrpc_message, link);
2198 if (msg->seq == seq)
2203 " Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n",
2204 __FUNCTION__, call, highest,
2205 call->acks_dftv_seq, call->snd_seq_count, seq);
2208 if (msg->state != RXRPC_MSG_SENT)
2209 continue; /* only un-ACK'd packets */
2211 rxrpc_get_message(msg);
2212 spin_unlock(&call->lock);
2214 /* send each message again (and ignore any errors we might
2216 _proto("Resending DATA message { ds=%Zu dc=%u df=%02lu }",
2217 msg->dsize, msg->dcount, msg->dfree);
2219 if (rxrpc_conn_sendmsg(call->conn, msg) == 0)
2220 call->pkt_snd_count++;
2222 rxrpc_put_message(msg);
2224 spin_lock(&call->lock);
2227 /* reset the timeout */
2228 mod_timer(&call->acks_timeout,
2229 __rxrpc_rtt_based_timeout(call, rxrpc_call_acks_timeout));
2231 spin_unlock(&call->lock);
2234 } /* end rxrpc_call_resend() */
2236 /*****************************************************************************/
2238 * handle an ICMP error being applied to a call
2240 void rxrpc_call_handle_error(struct rxrpc_call *call, int local, int errno)
2242 _enter("%p{%u},%d", call, ntohl(call->call_id), errno);
2244 /* if this call is already aborted, then just wake up any waiters */
2245 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
2246 call->app_error_func(call);
2249 /* tell the app layer what happened */
2250 spin_lock(&call->lock);
2251 call->app_call_state = RXRPC_CSTATE_ERROR;
2254 call->app_err_state = RXRPC_ESTATE_LOCAL_ERROR;
2256 call->app_err_state = RXRPC_ESTATE_REMOTE_ERROR;
2257 call->app_errno = errno;
2258 call->app_mark = RXRPC_APP_MARK_EOF;
2259 call->app_read_buf = NULL;
2260 call->app_async_read = 0;
2263 call->app_aemap_func(call);
2265 del_timer_sync(&call->acks_timeout);
2266 del_timer_sync(&call->rcv_timeout);
2267 del_timer_sync(&call->ackr_dfr_timo);
2269 spin_unlock(&call->lock);
2271 call->app_error_func(call);
2275 } /* end rxrpc_call_handle_error() */