1 /* call.c: Rx call routines
3 * Copyright (C) 2002 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
12 #include <linux/sched.h>
13 #include <linux/slab.h>
14 #include <linux/module.h>
15 #include <rxrpc/rxrpc.h>
16 #include <rxrpc/transport.h>
17 #include <rxrpc/peer.h>
18 #include <rxrpc/connection.h>
19 #include <rxrpc/call.h>
20 #include <rxrpc/message.h>
23 __RXACCT_DECL(atomic_t rxrpc_call_count);
24 __RXACCT_DECL(atomic_t rxrpc_message_count);
26 LIST_HEAD(rxrpc_calls);
27 DECLARE_RWSEM(rxrpc_calls_sem);
29 unsigned rxrpc_call_rcv_timeout = HZ/3;
30 unsigned rxrpc_call_acks_timeout = HZ/3;
31 unsigned rxrpc_call_dfr_ack_timeout = HZ/20;
32 unsigned short rxrpc_call_max_resend = HZ/10;
34 const char *rxrpc_call_states[] = {
47 const char *rxrpc_call_error_states[] = {
55 const char *rxrpc_pkts[] = {
57 "data", "ack", "busy", "abort", "ackall", "chall", "resp", "debug",
58 "?09", "?10", "?11", "?12", "?13", "?14", "?15"
61 const char *rxrpc_acks[] = {
62 "---", "REQ", "DUP", "SEQ", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
66 static const char _acktype[] = "NA-";
68 static void rxrpc_call_receive_packet(struct rxrpc_call *call);
69 static void rxrpc_call_receive_data_packet(struct rxrpc_call *call,
70 struct rxrpc_message *msg);
71 static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call,
72 struct rxrpc_message *msg);
73 static void rxrpc_call_definitively_ACK(struct rxrpc_call *call,
75 static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest);
76 static int __rxrpc_call_read_data(struct rxrpc_call *call);
78 static int rxrpc_call_record_ACK(struct rxrpc_call *call,
79 struct rxrpc_message *msg,
82 #define _state(call) \
83 _debug("[[[ state %s ]]]", rxrpc_call_states[call->app_call_state]);
85 static void rxrpc_call_default_attn_func(struct rxrpc_call *call)
87 wake_up(&call->waitq);
90 static void rxrpc_call_default_error_func(struct rxrpc_call *call)
92 wake_up(&call->waitq);
95 static void rxrpc_call_default_aemap_func(struct rxrpc_call *call)
97 switch (call->app_err_state) {
98 case RXRPC_ESTATE_LOCAL_ABORT:
99 call->app_abort_code = -call->app_errno;
100 case RXRPC_ESTATE_PEER_ABORT:
101 call->app_errno = -ECONNABORTED;
107 static void __rxrpc_call_acks_timeout(unsigned long _call)
109 struct rxrpc_call *call = (struct rxrpc_call *) _call;
111 _debug("ACKS TIMEOUT %05lu", jiffies - call->cjif);
113 call->flags |= RXRPC_CALL_ACKS_TIMO;
114 rxrpc_krxiod_queue_call(call);
117 static void __rxrpc_call_rcv_timeout(unsigned long _call)
119 struct rxrpc_call *call = (struct rxrpc_call *) _call;
121 _debug("RCV TIMEOUT %05lu", jiffies - call->cjif);
123 call->flags |= RXRPC_CALL_RCV_TIMO;
124 rxrpc_krxiod_queue_call(call);
127 static void __rxrpc_call_ackr_timeout(unsigned long _call)
129 struct rxrpc_call *call = (struct rxrpc_call *) _call;
131 _debug("ACKR TIMEOUT %05lu",jiffies - call->cjif);
133 call->flags |= RXRPC_CALL_ACKR_TIMO;
134 rxrpc_krxiod_queue_call(call);
137 /*****************************************************************************/
139 * calculate a timeout based on an RTT value
141 static inline unsigned long __rxrpc_rtt_based_timeout(struct rxrpc_call *call,
144 unsigned long expiry = call->conn->peer->rtt / (1000000 / HZ);
147 if (expiry < HZ / 25)
152 _leave(" = %lu jiffies", expiry);
153 return jiffies + expiry;
154 } /* end __rxrpc_rtt_based_timeout() */
156 /*****************************************************************************/
158 * create a new call record
160 static inline int __rxrpc_create_call(struct rxrpc_connection *conn,
161 struct rxrpc_call **_call)
163 struct rxrpc_call *call;
167 /* allocate and initialise a call record */
168 call = (struct rxrpc_call *) get_zeroed_page(GFP_KERNEL);
174 atomic_set(&call->usage, 1);
176 init_waitqueue_head(&call->waitq);
177 spin_lock_init(&call->lock);
178 INIT_LIST_HEAD(&call->link);
179 INIT_LIST_HEAD(&call->acks_pendq);
180 INIT_LIST_HEAD(&call->rcv_receiveq);
181 INIT_LIST_HEAD(&call->rcv_krxiodq_lk);
182 INIT_LIST_HEAD(&call->app_readyq);
183 INIT_LIST_HEAD(&call->app_unreadyq);
184 INIT_LIST_HEAD(&call->app_link);
185 INIT_LIST_HEAD(&call->app_attn_link);
187 init_timer(&call->acks_timeout);
188 call->acks_timeout.data = (unsigned long) call;
189 call->acks_timeout.function = __rxrpc_call_acks_timeout;
191 init_timer(&call->rcv_timeout);
192 call->rcv_timeout.data = (unsigned long) call;
193 call->rcv_timeout.function = __rxrpc_call_rcv_timeout;
195 init_timer(&call->ackr_dfr_timo);
196 call->ackr_dfr_timo.data = (unsigned long) call;
197 call->ackr_dfr_timo.function = __rxrpc_call_ackr_timeout;
200 call->ackr_win_bot = 1;
201 call->ackr_win_top = call->ackr_win_bot + RXRPC_CALL_ACK_WINDOW_SIZE - 1;
202 call->ackr_prev_seq = 0;
203 call->app_mark = RXRPC_APP_MARK_EOF;
204 call->app_attn_func = rxrpc_call_default_attn_func;
205 call->app_error_func = rxrpc_call_default_error_func;
206 call->app_aemap_func = rxrpc_call_default_aemap_func;
207 call->app_scr_alloc = call->app_scratch;
209 call->cjif = jiffies;
211 _leave(" = 0 (%p)", call);
216 } /* end __rxrpc_create_call() */
218 /*****************************************************************************/
220 * create a new call record for outgoing calls
222 int rxrpc_create_call(struct rxrpc_connection *conn,
223 rxrpc_call_attn_func_t attn,
224 rxrpc_call_error_func_t error,
225 rxrpc_call_aemap_func_t aemap,
226 struct rxrpc_call **_call)
228 DECLARE_WAITQUEUE(myself, current);
230 struct rxrpc_call *call;
235 /* allocate and initialise a call record */
236 ret = __rxrpc_create_call(conn, &call);
238 _leave(" = %d", ret);
242 call->app_call_state = RXRPC_CSTATE_CLNT_SND_ARGS;
244 call->app_attn_func = attn;
246 call->app_error_func = error;
248 call->app_aemap_func = aemap;
252 spin_lock(&conn->lock);
253 set_current_state(TASK_INTERRUPTIBLE);
254 add_wait_queue(&conn->chanwait, &myself);
257 /* try to find an unused channel */
258 for (cix = 0; cix < 4; cix++)
259 if (!conn->channels[cix])
262 /* no free channels - wait for one to become available */
264 if (signal_pending(current))
267 spin_unlock(&conn->lock);
270 set_current_state(TASK_INTERRUPTIBLE);
272 spin_lock(&conn->lock);
275 /* got a channel - now attach to the connection */
277 remove_wait_queue(&conn->chanwait, &myself);
278 set_current_state(TASK_RUNNING);
280 /* concoct a unique call number */
282 call->call_id = htonl(++conn->call_counter);
283 for (loop = 0; loop < 4; loop++)
284 if (conn->channels[loop] &&
285 conn->channels[loop]->call_id == call->call_id)
288 rxrpc_get_connection(conn);
289 conn->channels[cix] = call; /* assign _after_ done callid check loop */
290 do_gettimeofday(&conn->atime);
291 call->chan_ix = htonl(cix);
293 spin_unlock(&conn->lock);
295 down_write(&rxrpc_calls_sem);
296 list_add_tail(&call->call_link, &rxrpc_calls);
297 up_write(&rxrpc_calls_sem);
299 __RXACCT(atomic_inc(&rxrpc_call_count));
302 _leave(" = 0 (call=%p cix=%u)", call, cix);
306 remove_wait_queue(&conn->chanwait, &myself);
307 set_current_state(TASK_RUNNING);
308 spin_unlock(&conn->lock);
310 free_page((unsigned long) call);
311 _leave(" = %d", ret);
313 } /* end rxrpc_create_call() */
315 /*****************************************************************************/
317 * create a new call record for incoming calls
319 int rxrpc_incoming_call(struct rxrpc_connection *conn,
320 struct rxrpc_message *msg,
321 struct rxrpc_call **_call)
323 struct rxrpc_call *call;
327 cix = ntohl(msg->hdr.cid) & RXRPC_CHANNELMASK;
329 _enter("%p,%u,%u", conn, ntohl(msg->hdr.callNumber), cix);
331 /* allocate and initialise a call record */
332 ret = __rxrpc_create_call(conn, &call);
334 _leave(" = %d", ret);
338 call->pkt_rcv_count = 1;
339 call->app_call_state = RXRPC_CSTATE_SRVR_RCV_OPID;
340 call->app_mark = sizeof(uint32_t);
344 /* attach to the connection */
346 call->chan_ix = htonl(cix);
347 call->call_id = msg->hdr.callNumber;
349 spin_lock(&conn->lock);
351 if (!conn->channels[cix] ||
352 conn->channels[cix]->app_call_state == RXRPC_CSTATE_COMPLETE ||
353 conn->channels[cix]->app_call_state == RXRPC_CSTATE_ERROR
355 conn->channels[cix] = call;
356 rxrpc_get_connection(conn);
360 spin_unlock(&conn->lock);
363 free_page((unsigned long) call);
368 down_write(&rxrpc_calls_sem);
369 list_add_tail(&call->call_link, &rxrpc_calls);
370 up_write(&rxrpc_calls_sem);
371 __RXACCT(atomic_inc(&rxrpc_call_count));
375 _leave(" = %d [%p]", ret, call);
377 } /* end rxrpc_incoming_call() */
379 /*****************************************************************************/
383 void rxrpc_put_call(struct rxrpc_call *call)
385 struct rxrpc_connection *conn = call->conn;
386 struct rxrpc_message *msg;
388 _enter("%p{u=%d}",call,atomic_read(&call->usage));
391 if (atomic_read(&call->usage) <= 0)
394 /* to prevent a race, the decrement and the de-list must be effectively
396 spin_lock(&conn->lock);
397 if (likely(!atomic_dec_and_test(&call->usage))) {
398 spin_unlock(&conn->lock);
403 if (conn->channels[ntohl(call->chan_ix)] == call)
404 conn->channels[ntohl(call->chan_ix)] = NULL;
406 spin_unlock(&conn->lock);
408 wake_up(&conn->chanwait);
410 rxrpc_put_connection(conn);
412 /* clear the timers and dequeue from krxiod */
413 del_timer_sync(&call->acks_timeout);
414 del_timer_sync(&call->rcv_timeout);
415 del_timer_sync(&call->ackr_dfr_timo);
417 rxrpc_krxiod_dequeue_call(call);
419 /* clean up the contents of the struct */
420 if (call->snd_nextmsg)
421 rxrpc_put_message(call->snd_nextmsg);
424 rxrpc_put_message(call->snd_ping);
426 while (!list_empty(&call->acks_pendq)) {
427 msg = list_entry(call->acks_pendq.next,
428 struct rxrpc_message, link);
429 list_del(&msg->link);
430 rxrpc_put_message(msg);
433 while (!list_empty(&call->rcv_receiveq)) {
434 msg = list_entry(call->rcv_receiveq.next,
435 struct rxrpc_message, link);
436 list_del(&msg->link);
437 rxrpc_put_message(msg);
440 while (!list_empty(&call->app_readyq)) {
441 msg = list_entry(call->app_readyq.next,
442 struct rxrpc_message, link);
443 list_del(&msg->link);
444 rxrpc_put_message(msg);
447 while (!list_empty(&call->app_unreadyq)) {
448 msg = list_entry(call->app_unreadyq.next,
449 struct rxrpc_message, link);
450 list_del(&msg->link);
451 rxrpc_put_message(msg);
454 module_put(call->owner);
456 down_write(&rxrpc_calls_sem);
457 list_del(&call->call_link);
458 up_write(&rxrpc_calls_sem);
460 __RXACCT(atomic_dec(&rxrpc_call_count));
461 free_page((unsigned long) call);
463 _leave(" [destroyed]");
464 } /* end rxrpc_put_call() */
466 /*****************************************************************************/
468 * actually generate a normal ACK
470 static inline int __rxrpc_call_gen_normal_ACK(struct rxrpc_call *call,
473 struct rxrpc_message *msg;
474 struct iovec diov[3];
478 /* ACKs default to DELAY */
479 if (!call->ackr.reason)
480 call->ackr.reason = RXRPC_ACK_DELAY;
482 _proto("Rx %05lu Sending ACK { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
483 jiffies - call->cjif,
484 ntohs(call->ackr.maxSkew),
485 ntohl(call->ackr.firstPacket),
486 ntohl(call->ackr.previousPacket),
487 ntohl(call->ackr.serial),
488 rxrpc_acks[call->ackr.reason],
491 aux[0] = htonl(call->conn->peer->if_mtu); /* interface MTU */
492 aux[1] = htonl(1444); /* max MTU */
493 aux[2] = htonl(16); /* rwind */
494 aux[3] = htonl(4); /* max packets */
496 diov[0].iov_len = sizeof(struct rxrpc_ackpacket);
497 diov[0].iov_base = &call->ackr;
498 diov[1].iov_len = call->ackr_pend_cnt + 3;
499 diov[1].iov_base = call->ackr_array;
500 diov[2].iov_len = sizeof(aux);
501 diov[2].iov_base = &aux;
503 /* build and send the message */
504 ret = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK,
505 3, diov, GFP_KERNEL, &msg);
510 msg->hdr.seq = htonl(seq);
511 msg->hdr.flags |= RXRPC_SLOW_START_OK;
513 ret = rxrpc_conn_sendmsg(call->conn, msg);
514 rxrpc_put_message(msg);
517 call->pkt_snd_count++;
519 /* count how many actual ACKs there were at the front */
520 for (delta = 0; delta < call->ackr_pend_cnt; delta++)
521 if (call->ackr_array[delta] != RXRPC_ACK_TYPE_ACK)
524 call->ackr_pend_cnt -= delta; /* all ACK'd to this point */
526 /* crank the ACK window around */
528 /* un-ACK'd window */
530 else if (delta < RXRPC_CALL_ACK_WINDOW_SIZE) {
531 /* partially ACK'd window
532 * - shuffle down to avoid losing out-of-sequence packets
534 call->ackr_win_bot += delta;
535 call->ackr_win_top += delta;
537 memmove(&call->ackr_array[0],
538 &call->ackr_array[delta],
539 call->ackr_pend_cnt);
541 memset(&call->ackr_array[call->ackr_pend_cnt],
543 sizeof(call->ackr_array) - call->ackr_pend_cnt);
546 /* fully ACK'd window
547 * - just clear the whole thing
549 memset(&call->ackr_array,
551 sizeof(call->ackr_array));
555 memset(&call->ackr, 0, sizeof(call->ackr));
558 if (!call->app_call_state)
559 printk("___ STATE 0 ___\n");
561 } /* end __rxrpc_call_gen_normal_ACK() */
563 /*****************************************************************************/
565 * note the reception of a packet in the call's ACK records and generate an
566 * appropriate ACK packet if necessary
567 * - returns 0 if packet should be processed, 1 if packet should be ignored
568 * and -ve on an error
570 static int rxrpc_call_generate_ACK(struct rxrpc_call *call,
571 struct rxrpc_header *hdr,
572 struct rxrpc_ackpacket *ack)
574 struct rxrpc_message *msg;
578 u8 special_ACK, do_ACK, force;
580 _enter("%p,%p { seq=%d tp=%d fl=%02x }",
581 call, hdr, ntohl(hdr->seq), hdr->type, hdr->flags);
583 seq = ntohl(hdr->seq);
584 offset = seq - call->ackr_win_bot;
585 do_ACK = RXRPC_ACK_DELAY;
589 if (call->ackr_high_seq < seq)
590 call->ackr_high_seq = seq;
592 /* deal with generation of obvious special ACKs first */
593 if (ack && ack->reason == RXRPC_ACK_PING) {
594 special_ACK = RXRPC_ACK_PING_RESPONSE;
599 if (seq < call->ackr_win_bot) {
600 special_ACK = RXRPC_ACK_DUPLICATE;
605 if (seq >= call->ackr_win_top) {
606 special_ACK = RXRPC_ACK_EXCEEDS_WINDOW;
611 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_NACK) {
612 special_ACK = RXRPC_ACK_DUPLICATE;
617 /* okay... it's a normal data packet inside the ACK window */
618 call->ackr_array[offset] = RXRPC_ACK_TYPE_ACK;
620 if (offset < call->ackr_pend_cnt) {
622 else if (offset > call->ackr_pend_cnt) {
623 do_ACK = RXRPC_ACK_OUT_OF_SEQUENCE;
624 call->ackr_pend_cnt = offset;
628 if (hdr->flags & RXRPC_REQUEST_ACK) {
629 do_ACK = RXRPC_ACK_REQUESTED;
632 /* generate an ACK on the final packet of a reply just received */
633 if (hdr->flags & RXRPC_LAST_PACKET) {
634 if (call->conn->out_clientflag)
637 else if (!(hdr->flags & RXRPC_MORE_PACKETS)) {
638 do_ACK = RXRPC_ACK_REQUESTED;
641 /* re-ACK packets previously received out-of-order */
642 for (offset++; offset < RXRPC_CALL_ACK_WINDOW_SIZE; offset++)
643 if (call->ackr_array[offset] != RXRPC_ACK_TYPE_ACK)
646 call->ackr_pend_cnt = offset;
648 /* generate an ACK if we fill up the window */
649 if (call->ackr_pend_cnt >= RXRPC_CALL_ACK_WINDOW_SIZE)
653 _debug("%05lu ACKs pend=%u norm=%s special=%s%s",
654 jiffies - call->cjif,
657 rxrpc_acks[special_ACK],
658 force ? " immediate" :
659 do_ACK == RXRPC_ACK_REQUESTED ? " merge-req" :
660 hdr->flags & RXRPC_LAST_PACKET ? " finalise" :
664 /* send any pending normal ACKs if need be */
665 if (call->ackr_pend_cnt > 0) {
666 /* fill out the appropriate form */
667 call->ackr.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE);
668 call->ackr.maxSkew = htons(min(call->ackr_high_seq - seq,
670 call->ackr.firstPacket = htonl(call->ackr_win_bot);
671 call->ackr.previousPacket = call->ackr_prev_seq;
672 call->ackr.serial = hdr->serial;
673 call->ackr.nAcks = call->ackr_pend_cnt;
675 if (do_ACK == RXRPC_ACK_REQUESTED)
676 call->ackr.reason = do_ACK;
678 /* generate the ACK immediately if necessary */
679 if (special_ACK || force) {
680 err = __rxrpc_call_gen_normal_ACK(
681 call, do_ACK == RXRPC_ACK_DELAY ? 0 : seq);
689 if (call->ackr.reason == RXRPC_ACK_REQUESTED)
690 call->ackr_dfr_seq = seq;
692 /* start the ACK timer if not running if there are any pending deferred
694 if (call->ackr_pend_cnt > 0 &&
695 call->ackr.reason != RXRPC_ACK_REQUESTED &&
696 !timer_pending(&call->ackr_dfr_timo)
700 timo = rxrpc_call_dfr_ack_timeout + jiffies;
702 _debug("START ACKR TIMER for cj=%lu", timo - call->cjif);
704 spin_lock(&call->lock);
705 mod_timer(&call->ackr_dfr_timo, timo);
706 spin_unlock(&call->lock);
708 else if ((call->ackr_pend_cnt == 0 ||
709 call->ackr.reason == RXRPC_ACK_REQUESTED) &&
710 timer_pending(&call->ackr_dfr_timo)
712 /* stop timer if no pending ACKs */
713 _debug("CLEAR ACKR TIMER");
714 del_timer_sync(&call->ackr_dfr_timo);
717 /* send a special ACK if one is required */
719 struct rxrpc_ackpacket ack;
720 struct iovec diov[2];
721 uint8_t acks[1] = { RXRPC_ACK_TYPE_ACK };
723 /* fill out the appropriate form */
724 ack.bufferSpace = htons(RXRPC_CALL_ACK_WINDOW_SIZE);
725 ack.maxSkew = htons(min(call->ackr_high_seq - seq,
727 ack.firstPacket = htonl(call->ackr_win_bot);
728 ack.previousPacket = call->ackr_prev_seq;
729 ack.serial = hdr->serial;
730 ack.reason = special_ACK;
733 _proto("Rx Sending s-ACK"
734 " { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
736 ntohl(ack.firstPacket),
737 ntohl(ack.previousPacket),
739 rxrpc_acks[ack.reason],
742 diov[0].iov_len = sizeof(struct rxrpc_ackpacket);
743 diov[0].iov_base = &ack;
744 diov[1].iov_len = sizeof(acks);
745 diov[1].iov_base = acks;
747 /* build and send the message */
748 err = rxrpc_conn_newmsg(call->conn,call, RXRPC_PACKET_TYPE_ACK,
749 hdr->seq ? 2 : 1, diov,
758 msg->hdr.seq = htonl(seq);
759 msg->hdr.flags |= RXRPC_SLOW_START_OK;
761 err = rxrpc_conn_sendmsg(call->conn, msg);
762 rxrpc_put_message(msg);
767 call->pkt_snd_count++;
772 call->ackr_prev_seq = hdr->seq;
774 _leave(" = %d", ret);
776 } /* end rxrpc_call_generate_ACK() */
778 /*****************************************************************************/
780 * handle work to be done on a call
781 * - includes packet reception and timeout processing
783 void rxrpc_call_do_stuff(struct rxrpc_call *call)
785 _enter("%p{flags=%lx}", call, call->flags);
787 /* handle packet reception */
788 if (call->flags & RXRPC_CALL_RCV_PKT) {
789 _debug("- receive packet");
790 call->flags &= ~RXRPC_CALL_RCV_PKT;
791 rxrpc_call_receive_packet(call);
794 /* handle overdue ACKs */
795 if (call->flags & RXRPC_CALL_ACKS_TIMO) {
796 _debug("- overdue ACK timeout");
797 call->flags &= ~RXRPC_CALL_ACKS_TIMO;
798 rxrpc_call_resend(call, call->snd_seq_count);
801 /* handle lack of reception */
802 if (call->flags & RXRPC_CALL_RCV_TIMO) {
803 _debug("- reception timeout");
804 call->flags &= ~RXRPC_CALL_RCV_TIMO;
805 rxrpc_call_abort(call, -EIO);
808 /* handle deferred ACKs */
809 if (call->flags & RXRPC_CALL_ACKR_TIMO ||
810 (call->ackr.nAcks > 0 && call->ackr.reason == RXRPC_ACK_REQUESTED)
812 _debug("- deferred ACK timeout: cj=%05lu r=%s n=%u",
813 jiffies - call->cjif,
814 rxrpc_acks[call->ackr.reason],
817 call->flags &= ~RXRPC_CALL_ACKR_TIMO;
819 if (call->ackr.nAcks > 0 &&
820 call->app_call_state != RXRPC_CSTATE_ERROR) {
822 __rxrpc_call_gen_normal_ACK(call, call->ackr_dfr_seq);
823 call->ackr_dfr_seq = 0;
829 } /* end rxrpc_call_do_stuff() */
831 /*****************************************************************************/
833 * send an abort message at call or connection level
834 * - must be called with call->lock held
835 * - the supplied error code is sent as the packet data
837 static int __rxrpc_call_abort(struct rxrpc_call *call, int errno)
839 struct rxrpc_connection *conn = call->conn;
840 struct rxrpc_message *msg;
841 struct iovec diov[1];
845 _enter("%p{%08x},%p{%d},%d",
846 conn, ntohl(conn->conn_id), call, ntohl(call->call_id), errno);
848 /* if this call is already aborted, then just wake up any waiters */
849 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
850 spin_unlock(&call->lock);
851 call->app_error_func(call);
856 rxrpc_get_call(call);
858 /* change the state _with_ the lock still held */
859 call->app_call_state = RXRPC_CSTATE_ERROR;
860 call->app_err_state = RXRPC_ESTATE_LOCAL_ABORT;
861 call->app_errno = errno;
862 call->app_mark = RXRPC_APP_MARK_EOF;
863 call->app_read_buf = NULL;
864 call->app_async_read = 0;
868 /* ask the app to translate the error code */
869 call->app_aemap_func(call);
871 spin_unlock(&call->lock);
873 /* flush any outstanding ACKs */
874 del_timer_sync(&call->acks_timeout);
875 del_timer_sync(&call->rcv_timeout);
876 del_timer_sync(&call->ackr_dfr_timo);
878 if (rxrpc_call_is_ack_pending(call))
879 __rxrpc_call_gen_normal_ACK(call, 0);
881 /* send the abort packet only if we actually traded some other
884 if (call->pkt_snd_count || call->pkt_rcv_count) {
885 /* actually send the abort */
886 _proto("Rx Sending Call ABORT { data=%d }",
887 call->app_abort_code);
889 _error = htonl(call->app_abort_code);
891 diov[0].iov_len = sizeof(_error);
892 diov[0].iov_base = &_error;
894 ret = rxrpc_conn_newmsg(conn, call, RXRPC_PACKET_TYPE_ABORT,
895 1, diov, GFP_KERNEL, &msg);
897 ret = rxrpc_conn_sendmsg(conn, msg);
898 rxrpc_put_message(msg);
902 /* tell the app layer to let go */
903 call->app_error_func(call);
905 rxrpc_put_call(call);
907 _leave(" = %d", ret);
909 } /* end __rxrpc_call_abort() */
911 /*****************************************************************************/
913 * send an abort message at call or connection level
914 * - the supplied error code is sent as the packet data
916 int rxrpc_call_abort(struct rxrpc_call *call, int error)
918 spin_lock(&call->lock);
920 return __rxrpc_call_abort(call, error);
922 } /* end rxrpc_call_abort() */
924 /*****************************************************************************/
926 * process packets waiting for this call
928 static void rxrpc_call_receive_packet(struct rxrpc_call *call)
930 struct rxrpc_message *msg;
931 struct list_head *_p;
936 rxrpc_get_call(call); /* must not go away too soon if aborted by
939 while (!list_empty(&call->rcv_receiveq)) {
940 /* try to get next packet */
942 spin_lock(&call->lock);
943 if (!list_empty(&call->rcv_receiveq)) {
944 _p = call->rcv_receiveq.next;
947 spin_unlock(&call->lock);
952 msg = list_entry(_p, struct rxrpc_message, link);
954 _proto("Rx %05lu Received %s packet (%%%u,#%u,%c%c%c%c%c)",
955 jiffies - call->cjif,
956 rxrpc_pkts[msg->hdr.type],
957 ntohl(msg->hdr.serial),
959 msg->hdr.flags & RXRPC_JUMBO_PACKET ? 'j' : '-',
960 msg->hdr.flags & RXRPC_MORE_PACKETS ? 'm' : '-',
961 msg->hdr.flags & RXRPC_LAST_PACKET ? 'l' : '-',
962 msg->hdr.flags & RXRPC_REQUEST_ACK ? 'r' : '-',
963 msg->hdr.flags & RXRPC_CLIENT_INITIATED ? 'C' : 'S'
966 switch (msg->hdr.type) {
967 /* deal with data packets */
968 case RXRPC_PACKET_TYPE_DATA:
969 /* ACK the packet if necessary */
970 switch (rxrpc_call_generate_ACK(call, &msg->hdr,
972 case 0: /* useful packet */
973 rxrpc_call_receive_data_packet(call, msg);
975 case 1: /* duplicate or out-of-window packet */
978 rxrpc_put_message(msg);
983 /* deal with ACK packets */
984 case RXRPC_PACKET_TYPE_ACK:
985 rxrpc_call_receive_ack_packet(call, msg);
988 /* deal with abort packets */
989 case RXRPC_PACKET_TYPE_ABORT:
991 if (skb_copy_bits(msg->pkt, msg->offset,
992 &data32, sizeof(data32)) < 0) {
993 printk("Rx Received short ABORT packet\n");
996 data32 = ntohl(data32);
999 _proto("Rx Received Call ABORT { data=%d }", data32);
1001 spin_lock(&call->lock);
1002 call->app_call_state = RXRPC_CSTATE_ERROR;
1003 call->app_err_state = RXRPC_ESTATE_PEER_ABORT;
1004 call->app_abort_code = data32;
1005 call->app_errno = -ECONNABORTED;
1006 call->app_mark = RXRPC_APP_MARK_EOF;
1007 call->app_read_buf = NULL;
1008 call->app_async_read = 0;
1010 /* ask the app to translate the error code */
1011 call->app_aemap_func(call);
1013 spin_unlock(&call->lock);
1014 call->app_error_func(call);
1018 /* deal with other packet types */
1019 _proto("Rx Unsupported packet type %u (#%u)",
1020 msg->hdr.type, msg->seq);
1024 rxrpc_put_message(msg);
1028 rxrpc_put_call(call);
1030 } /* end rxrpc_call_receive_packet() */
1032 /*****************************************************************************/
1034 * process next data packet
1035 * - as the next data packet arrives:
1036 * - it is queued on app_readyq _if_ it is the next one expected
1038 * - it is queued on app_unreadyq _if_ it is not the next one expected
1039 * - if a packet placed on app_readyq completely fills a hole leading up to
1040 * the first packet on app_unreadyq, then packets now in sequence are
1041 * tranferred to app_readyq
1042 * - the application layer can only see packets on app_readyq
1043 * (app_ready_qty bytes)
1044 * - the application layer is prodded every time a new packet arrives
1046 static void rxrpc_call_receive_data_packet(struct rxrpc_call *call,
1047 struct rxrpc_message *msg)
1049 const struct rxrpc_operation *optbl, *op;
1050 struct rxrpc_message *pmsg;
1051 struct list_head *_p;
1052 int ret, lo, hi, rmtimo;
1055 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq);
1057 rxrpc_get_message(msg);
1059 /* add to the unready queue if we'd have to create a hole in the ready
1060 * queue otherwise */
1061 if (msg->seq != call->app_ready_seq + 1) {
1062 _debug("Call add packet %d to unreadyq", msg->seq);
1064 /* insert in seq order */
1065 list_for_each(_p, &call->app_unreadyq) {
1066 pmsg = list_entry(_p, struct rxrpc_message, link);
1067 if (pmsg->seq > msg->seq)
1071 list_add_tail(&msg->link, _p);
1073 _leave(" [unreadyq]");
1077 /* next in sequence - simply append into the call's ready queue */
1078 _debug("Call add packet %d to readyq (+%Zd => %Zd bytes)",
1079 msg->seq, msg->dsize, call->app_ready_qty);
1081 spin_lock(&call->lock);
1082 call->app_ready_seq = msg->seq;
1083 call->app_ready_qty += msg->dsize;
1084 list_add_tail(&msg->link, &call->app_readyq);
1086 /* move unready packets to the readyq if we got rid of a hole */
1087 while (!list_empty(&call->app_unreadyq)) {
1088 pmsg = list_entry(call->app_unreadyq.next,
1089 struct rxrpc_message, link);
1091 if (pmsg->seq != call->app_ready_seq + 1)
1094 /* next in sequence - just move list-to-list */
1095 _debug("Call transfer packet %d to readyq (+%Zd => %Zd bytes)",
1096 pmsg->seq, pmsg->dsize, call->app_ready_qty);
1098 call->app_ready_seq = pmsg->seq;
1099 call->app_ready_qty += pmsg->dsize;
1100 list_del_init(&pmsg->link);
1101 list_add_tail(&pmsg->link, &call->app_readyq);
1104 /* see if we've got the last packet yet */
1105 if (!list_empty(&call->app_readyq)) {
1106 pmsg = list_entry(call->app_readyq.prev,
1107 struct rxrpc_message, link);
1108 if (pmsg->hdr.flags & RXRPC_LAST_PACKET) {
1109 call->app_last_rcv = 1;
1110 _debug("Last packet on readyq");
1114 switch (call->app_call_state) {
1115 /* do nothing if call already aborted */
1116 case RXRPC_CSTATE_ERROR:
1117 spin_unlock(&call->lock);
1121 /* extract the operation ID from an incoming call if that's not
1123 case RXRPC_CSTATE_SRVR_RCV_OPID:
1124 spin_unlock(&call->lock);
1126 /* handle as yet insufficient data for the operation ID */
1127 if (call->app_ready_qty < 4) {
1128 if (call->app_last_rcv)
1129 /* trouble - last packet seen */
1130 rxrpc_call_abort(call, -EINVAL);
1136 /* pull the operation ID out of the buffer */
1137 ret = rxrpc_call_read_data(call, &opid, sizeof(opid), 0);
1139 printk("Unexpected error from read-data: %d\n", ret);
1140 if (call->app_call_state != RXRPC_CSTATE_ERROR)
1141 rxrpc_call_abort(call, ret);
1145 call->app_opcode = ntohl(opid);
1147 /* locate the operation in the available ops table */
1148 optbl = call->conn->service->ops_begin;
1150 hi = call->conn->service->ops_end - optbl;
1153 int mid = (hi + lo) / 2;
1155 if (call->app_opcode == op->id)
1157 if (call->app_opcode > op->id)
1164 kproto("Rx Client requested operation %d from %s service",
1165 call->app_opcode, call->conn->service->name);
1166 rxrpc_call_abort(call, -EINVAL);
1171 _proto("Rx Client requested operation %s from %s service",
1172 op->name, call->conn->service->name);
1174 /* we're now waiting for the argument block (unless the call
1176 spin_lock(&call->lock);
1177 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_OPID ||
1178 call->app_call_state == RXRPC_CSTATE_SRVR_SND_REPLY) {
1179 if (!call->app_last_rcv)
1180 call->app_call_state =
1181 RXRPC_CSTATE_SRVR_RCV_ARGS;
1182 else if (call->app_ready_qty > 0)
1183 call->app_call_state =
1184 RXRPC_CSTATE_SRVR_GOT_ARGS;
1186 call->app_call_state =
1187 RXRPC_CSTATE_SRVR_SND_REPLY;
1188 call->app_mark = op->asize;
1189 call->app_user = op->user;
1191 spin_unlock(&call->lock);
1196 case RXRPC_CSTATE_SRVR_RCV_ARGS:
1197 /* change state if just received last packet of arg block */
1198 if (call->app_last_rcv)
1199 call->app_call_state = RXRPC_CSTATE_SRVR_GOT_ARGS;
1200 spin_unlock(&call->lock);
1205 case RXRPC_CSTATE_CLNT_RCV_REPLY:
1206 /* change state if just received last packet of reply block */
1208 if (call->app_last_rcv) {
1209 call->app_call_state = RXRPC_CSTATE_CLNT_GOT_REPLY;
1212 spin_unlock(&call->lock);
1215 del_timer_sync(&call->acks_timeout);
1216 del_timer_sync(&call->rcv_timeout);
1217 del_timer_sync(&call->ackr_dfr_timo);
1224 /* deal with data reception in an unexpected state */
1225 printk("Unexpected state [[[ %u ]]]\n", call->app_call_state);
1226 __rxrpc_call_abort(call, -EBADMSG);
1231 if (call->app_call_state == RXRPC_CSTATE_CLNT_RCV_REPLY &&
1235 /* otherwise just invoke the data function whenever we can satisfy its desire for more
1238 _proto("Rx Received Op Data: st=%u qty=%Zu mk=%Zu%s",
1239 call->app_call_state, call->app_ready_qty, call->app_mark,
1240 call->app_last_rcv ? " last-rcvd" : "");
1242 spin_lock(&call->lock);
1244 ret = __rxrpc_call_read_data(call);
1247 spin_unlock(&call->lock);
1248 call->app_attn_func(call);
1251 spin_unlock(&call->lock);
1254 spin_unlock(&call->lock);
1257 __rxrpc_call_abort(call, ret);
1265 } /* end rxrpc_call_receive_data_packet() */
1267 /*****************************************************************************/
1269 * received an ACK packet
1271 static void rxrpc_call_receive_ack_packet(struct rxrpc_call *call,
1272 struct rxrpc_message *msg)
1274 struct rxrpc_ackpacket ack;
1275 rxrpc_serial_t serial;
1279 _enter("%p{%u},%p{%u}", call, ntohl(call->call_id), msg, msg->seq);
1281 /* extract the basic ACK record */
1282 if (skb_copy_bits(msg->pkt, msg->offset, &ack, sizeof(ack)) < 0) {
1283 printk("Rx Received short ACK packet\n");
1286 msg->offset += sizeof(ack);
1288 serial = ack.serial;
1289 seq = ntohl(ack.firstPacket);
1291 _proto("Rx Received ACK %%%d { b=%hu m=%hu f=%u p=%u s=%u r=%s n=%u }",
1292 ntohl(msg->hdr.serial),
1293 ntohs(ack.bufferSpace),
1296 ntohl(ack.previousPacket),
1298 rxrpc_acks[ack.reason],
1302 /* check the other side isn't ACK'ing a sequence number I haven't sent
1304 if (ack.nAcks > 0 &&
1305 (seq > call->snd_seq_count ||
1306 seq + ack.nAcks - 1 > call->snd_seq_count)) {
1307 printk("Received ACK (#%u-#%u) for unsent packet\n",
1308 seq, seq + ack.nAcks - 1);
1309 rxrpc_call_abort(call, -EINVAL);
1314 /* deal with RTT calculation */
1316 struct rxrpc_message *rttmsg;
1318 /* find the prompting packet */
1319 spin_lock(&call->lock);
1320 if (call->snd_ping && call->snd_ping->hdr.serial == serial) {
1321 /* it was a ping packet */
1322 rttmsg = call->snd_ping;
1323 call->snd_ping = NULL;
1324 spin_unlock(&call->lock);
1327 rttmsg->rttdone = 1;
1328 rxrpc_peer_calculate_rtt(call->conn->peer,
1330 rxrpc_put_message(rttmsg);
1334 struct list_head *_p;
1336 /* it ought to be a data packet - look in the pending
1338 list_for_each(_p, &call->acks_pendq) {
1339 rttmsg = list_entry(_p, struct rxrpc_message,
1341 if (rttmsg->hdr.serial == serial) {
1342 if (rttmsg->rttdone)
1343 /* never do RTT twice without
1347 rttmsg->rttdone = 1;
1348 rxrpc_peer_calculate_rtt(
1349 call->conn->peer, rttmsg, msg);
1353 spin_unlock(&call->lock);
1357 switch (ack.reason) {
1358 /* deal with negative/positive acknowledgement of data
1360 case RXRPC_ACK_REQUESTED:
1361 case RXRPC_ACK_DELAY:
1362 case RXRPC_ACK_IDLE:
1363 rxrpc_call_definitively_ACK(call, seq - 1);
1365 case RXRPC_ACK_DUPLICATE:
1366 case RXRPC_ACK_OUT_OF_SEQUENCE:
1367 case RXRPC_ACK_EXCEEDS_WINDOW:
1368 call->snd_resend_cnt = 0;
1369 ret = rxrpc_call_record_ACK(call, msg, seq, ack.nAcks);
1371 rxrpc_call_abort(call, ret);
1374 /* respond to ping packets immediately */
1375 case RXRPC_ACK_PING:
1376 rxrpc_call_generate_ACK(call, &msg->hdr, &ack);
1379 /* only record RTT on ping response packets */
1380 case RXRPC_ACK_PING_RESPONSE:
1381 if (call->snd_ping) {
1382 struct rxrpc_message *rttmsg;
1384 /* only do RTT stuff if the response matches the
1387 spin_lock(&call->lock);
1388 if (call->snd_ping &&
1389 call->snd_ping->hdr.serial == ack.serial) {
1390 rttmsg = call->snd_ping;
1391 call->snd_ping = NULL;
1393 spin_unlock(&call->lock);
1396 rttmsg->rttdone = 1;
1397 rxrpc_peer_calculate_rtt(call->conn->peer,
1399 rxrpc_put_message(rttmsg);
1405 printk("Unsupported ACK reason %u\n", ack.reason);
1410 } /* end rxrpc_call_receive_ack_packet() */
1412 /*****************************************************************************/
1414 * record definitive ACKs for all messages up to and including the one with the
1417 static void rxrpc_call_definitively_ACK(struct rxrpc_call *call,
1418 rxrpc_seq_t highest)
1420 struct rxrpc_message *msg;
1423 _enter("%p{ads=%u},%u", call, call->acks_dftv_seq, highest);
1425 while (call->acks_dftv_seq < highest) {
1426 call->acks_dftv_seq++;
1428 _proto("Definitive ACK on packet #%u", call->acks_dftv_seq);
1430 /* discard those at front of queue until message with highest
1432 spin_lock(&call->lock);
1434 if (!list_empty(&call->acks_pendq)) {
1435 msg = list_entry(call->acks_pendq.next,
1436 struct rxrpc_message, link);
1437 list_del_init(&msg->link); /* dequeue */
1438 if (msg->state == RXRPC_MSG_SENT)
1439 call->acks_pend_cnt--;
1441 spin_unlock(&call->lock);
1443 /* insanity check */
1445 panic("%s(): acks_pendq unexpectedly empty\n",
1448 if (msg->seq != call->acks_dftv_seq)
1449 panic("%s(): Packet #%u expected at front of acks_pendq"
1451 __FUNCTION__, call->acks_dftv_seq, msg->seq);
1453 /* discard the message */
1454 msg->state = RXRPC_MSG_DONE;
1455 rxrpc_put_message(msg);
1458 /* if all sent packets are definitively ACK'd then prod any sleepers just in case */
1460 spin_lock(&call->lock);
1461 if (call->acks_dftv_seq == call->snd_seq_count) {
1462 if (call->app_call_state != RXRPC_CSTATE_COMPLETE) {
1463 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1468 spin_unlock(&call->lock);
1471 del_timer_sync(&call->acks_timeout);
1472 del_timer_sync(&call->rcv_timeout);
1473 del_timer_sync(&call->ackr_dfr_timo);
1474 call->app_attn_func(call);
1478 } /* end rxrpc_call_definitively_ACK() */
1480 /*****************************************************************************/
1482 * record the specified amount of ACKs/NAKs
1484 static int rxrpc_call_record_ACK(struct rxrpc_call *call,
1485 struct rxrpc_message *msg,
1489 struct rxrpc_message *dmsg;
1490 struct list_head *_p;
1491 rxrpc_seq_t highest;
1494 char resend, now_complete;
1497 _enter("%p{apc=%u ads=%u},%p,%u,%Zu",
1498 call, call->acks_pend_cnt, call->acks_dftv_seq,
1501 /* handle re-ACK'ing of definitively ACK'd packets (may be out-of-order
1503 if (seq <= call->acks_dftv_seq) {
1504 unsigned delta = call->acks_dftv_seq - seq;
1506 if (count <= delta) {
1507 _leave(" = 0 [all definitively ACK'd]");
1513 msg->offset += delta;
1516 highest = seq + count - 1;
1519 /* extract up to 16 ACK slots at a time */
1520 chunk = min(count, sizeof(acks));
1523 memset(acks, 2, sizeof(acks));
1525 if (skb_copy_bits(msg->pkt, msg->offset, &acks, chunk) < 0) {
1526 printk("Rx Received short ACK packet\n");
1527 _leave(" = -EINVAL");
1530 msg->offset += chunk;
1532 /* check that the ACK set is valid */
1533 for (ix = 0; ix < chunk; ix++) {
1535 case RXRPC_ACK_TYPE_ACK:
1537 case RXRPC_ACK_TYPE_NACK:
1541 printk("Rx Received unsupported ACK state"
1543 _leave(" = -EINVAL");
1548 _proto("Rx ACK of packets #%u-#%u "
1549 "[%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c] (pend=%u)",
1550 seq, (unsigned) (seq + chunk - 1),
1551 _acktype[acks[0x0]],
1552 _acktype[acks[0x1]],
1553 _acktype[acks[0x2]],
1554 _acktype[acks[0x3]],
1555 _acktype[acks[0x4]],
1556 _acktype[acks[0x5]],
1557 _acktype[acks[0x6]],
1558 _acktype[acks[0x7]],
1559 _acktype[acks[0x8]],
1560 _acktype[acks[0x9]],
1561 _acktype[acks[0xA]],
1562 _acktype[acks[0xB]],
1563 _acktype[acks[0xC]],
1564 _acktype[acks[0xD]],
1565 _acktype[acks[0xE]],
1566 _acktype[acks[0xF]],
1570 /* mark the packets in the ACK queue as being provisionally
1573 spin_lock(&call->lock);
1575 /* find the first packet ACK'd/NAK'd here */
1576 list_for_each(_p, &call->acks_pendq) {
1577 dmsg = list_entry(_p, struct rxrpc_message, link);
1578 if (dmsg->seq == seq)
1580 _debug("- %u: skipping #%u", ix, dmsg->seq);
1586 _debug("- %u: processing #%u (%c) apc=%u",
1587 ix, dmsg->seq, _acktype[acks[ix]],
1588 call->acks_pend_cnt);
1590 if (acks[ix] == RXRPC_ACK_TYPE_ACK) {
1591 if (dmsg->state == RXRPC_MSG_SENT)
1592 call->acks_pend_cnt--;
1593 dmsg->state = RXRPC_MSG_ACKED;
1596 if (dmsg->state == RXRPC_MSG_ACKED)
1597 call->acks_pend_cnt++;
1598 dmsg->state = RXRPC_MSG_SENT;
1603 _p = dmsg->link.next;
1604 dmsg = list_entry(_p, struct rxrpc_message, link);
1605 } while(ix < chunk &&
1606 _p != &call->acks_pendq &&
1612 spin_unlock(&call->lock);
1616 rxrpc_call_resend(call, highest);
1618 /* if all packets are provisionally ACK'd, then wake up anyone who's
1619 * waiting for that */
1621 spin_lock(&call->lock);
1622 if (call->acks_pend_cnt == 0) {
1623 if (call->app_call_state == RXRPC_CSTATE_SRVR_RCV_FINAL_ACK) {
1624 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1629 spin_unlock(&call->lock);
1632 _debug("- wake up waiters");
1633 del_timer_sync(&call->acks_timeout);
1634 del_timer_sync(&call->rcv_timeout);
1635 del_timer_sync(&call->ackr_dfr_timo);
1636 call->app_attn_func(call);
1639 _leave(" = 0 (apc=%u)", call->acks_pend_cnt);
1643 panic("%s(): acks_pendq in bad state (packet #%u absent)\n",
1646 } /* end rxrpc_call_record_ACK() */
1648 /*****************************************************************************/
1650 * transfer data from the ready packet queue to the asynchronous read buffer
1651 * - since this func is the only one going to look at packets queued on
1652 * app_readyq, we don't need a lock to modify or access them, only to modify
1653 * the queue pointers
1654 * - called with call->lock held
1655 * - the buffer must be in kernel space
1657 * 0 if buffer filled
1658 * -EAGAIN if buffer not filled and more data to come
1659 * -EBADMSG if last packet received and insufficient data left
1660 * -ECONNABORTED if the call has in an error state
1662 static int __rxrpc_call_read_data(struct rxrpc_call *call)
1664 struct rxrpc_message *msg;
1668 _enter("%p{as=%d buf=%p qty=%Zu/%Zu}",
1670 call->app_async_read, call->app_read_buf,
1671 call->app_ready_qty, call->app_mark);
1673 /* check the state */
1674 switch (call->app_call_state) {
1675 case RXRPC_CSTATE_SRVR_RCV_ARGS:
1676 case RXRPC_CSTATE_CLNT_RCV_REPLY:
1677 if (call->app_last_rcv) {
1678 printk("%s(%p,%p,%Zd):"
1679 " Inconsistent call state (%s, last pkt)",
1681 call, call->app_read_buf, call->app_mark,
1682 rxrpc_call_states[call->app_call_state]);
1687 case RXRPC_CSTATE_SRVR_RCV_OPID:
1688 case RXRPC_CSTATE_SRVR_GOT_ARGS:
1689 case RXRPC_CSTATE_CLNT_GOT_REPLY:
1692 case RXRPC_CSTATE_SRVR_SND_REPLY:
1693 if (!call->app_last_rcv) {
1694 printk("%s(%p,%p,%Zd):"
1695 " Inconsistent call state (%s, not last pkt)",
1697 call, call->app_read_buf, call->app_mark,
1698 rxrpc_call_states[call->app_call_state]);
1701 _debug("Trying to read data from call in SND_REPLY state");
1704 case RXRPC_CSTATE_ERROR:
1705 _leave(" = -ECONNABORTED");
1706 return -ECONNABORTED;
1709 printk("reading in unexpected state [[[ %u ]]]\n",
1710 call->app_call_state);
1714 /* handle the case of not having an async buffer */
1715 if (!call->app_async_read) {
1716 if (call->app_mark == RXRPC_APP_MARK_EOF) {
1717 ret = call->app_last_rcv ? 0 : -EAGAIN;
1720 if (call->app_mark >= call->app_ready_qty) {
1721 call->app_mark = RXRPC_APP_MARK_EOF;
1725 ret = call->app_last_rcv ? -EBADMSG : -EAGAIN;
1729 _leave(" = %d [no buf]", ret);
1733 while (!list_empty(&call->app_readyq) && call->app_mark > 0) {
1734 msg = list_entry(call->app_readyq.next,
1735 struct rxrpc_message, link);
1737 /* drag as much data as we need out of this packet */
1738 qty = min(call->app_mark, msg->dsize);
1740 _debug("reading %Zu from skb=%p off=%lu",
1741 qty, msg->pkt, msg->offset);
1743 if (call->app_read_buf)
1744 if (skb_copy_bits(msg->pkt, msg->offset,
1745 call->app_read_buf, qty) < 0)
1746 panic("%s: Failed to copy data from packet:"
1749 call, call->app_read_buf, qty);
1751 /* if that packet is now empty, discard it */
1752 call->app_ready_qty -= qty;
1755 if (msg->dsize == 0) {
1756 list_del_init(&msg->link);
1757 rxrpc_put_message(msg);
1763 call->app_mark -= qty;
1764 if (call->app_read_buf)
1765 call->app_read_buf += qty;
1768 if (call->app_mark == 0) {
1769 call->app_async_read = 0;
1770 call->app_mark = RXRPC_APP_MARK_EOF;
1771 call->app_read_buf = NULL;
1773 /* adjust the state if used up all packets */
1774 if (list_empty(&call->app_readyq) && call->app_last_rcv) {
1775 switch (call->app_call_state) {
1776 case RXRPC_CSTATE_SRVR_RCV_OPID:
1777 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY;
1778 call->app_mark = RXRPC_APP_MARK_EOF;
1780 del_timer_sync(&call->rcv_timeout);
1782 case RXRPC_CSTATE_SRVR_GOT_ARGS:
1783 call->app_call_state = RXRPC_CSTATE_SRVR_SND_REPLY;
1785 del_timer_sync(&call->rcv_timeout);
1788 call->app_call_state = RXRPC_CSTATE_COMPLETE;
1790 del_timer_sync(&call->acks_timeout);
1791 del_timer_sync(&call->ackr_dfr_timo);
1792 del_timer_sync(&call->rcv_timeout);
1801 if (call->app_last_rcv) {
1802 _debug("Insufficient data (%Zu/%Zu)",
1803 call->app_ready_qty, call->app_mark);
1804 call->app_async_read = 0;
1805 call->app_mark = RXRPC_APP_MARK_EOF;
1806 call->app_read_buf = NULL;
1808 _leave(" = -EBADMSG");
1812 _leave(" = -EAGAIN");
1814 } /* end __rxrpc_call_read_data() */
1816 /*****************************************************************************/
1818 * attempt to read the specified amount of data from the call's ready queue
1819 * into the buffer provided
1820 * - since this func is the only one going to look at packets queued on
1821 * app_readyq, we don't need a lock to modify or access them, only to modify
1822 * the queue pointers
1823 * - if the buffer pointer is NULL, then data is merely drained, not copied
1824 * - if flags&RXRPC_CALL_READ_BLOCK, then the function will wait until there is
1825 * enough data or an error will be generated
1826 * - note that the caller must have added the calling task to the call's wait
1828 * - if flags&RXRPC_CALL_READ_ALL, then an error will be generated if this
1829 * function doesn't read all available data
1831 int rxrpc_call_read_data(struct rxrpc_call *call,
1832 void *buffer, size_t size, int flags)
1836 _enter("%p{arq=%Zu},%p,%Zd,%x",
1837 call, call->app_ready_qty, buffer, size, flags);
1839 spin_lock(&call->lock);
1841 if (unlikely(!!call->app_read_buf)) {
1842 spin_unlock(&call->lock);
1843 _leave(" = -EBUSY");
1847 call->app_mark = size;
1848 call->app_read_buf = buffer;
1849 call->app_async_read = 1;
1850 call->app_read_count++;
1852 /* read as much data as possible */
1853 ret = __rxrpc_call_read_data(call);
1856 if (flags & RXRPC_CALL_READ_ALL &&
1857 (!call->app_last_rcv || call->app_ready_qty > 0)) {
1858 _leave(" = -EBADMSG");
1859 __rxrpc_call_abort(call, -EBADMSG);
1863 spin_unlock(&call->lock);
1864 call->app_attn_func(call);
1869 spin_unlock(&call->lock);
1870 _leave(" = %d [aborted]", ret);
1874 __rxrpc_call_abort(call, ret);
1875 _leave(" = %d", ret);
1879 spin_unlock(&call->lock);
1881 if (!(flags & RXRPC_CALL_READ_BLOCK)) {
1882 _leave(" = -EAGAIN");
1886 /* wait for the data to arrive */
1887 _debug("blocking for data arrival");
1890 set_current_state(TASK_INTERRUPTIBLE);
1891 if (!call->app_async_read || signal_pending(current))
1895 set_current_state(TASK_RUNNING);
1897 if (signal_pending(current)) {
1898 _leave(" = -EINTR");
1902 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
1903 _leave(" = -ECONNABORTED");
1904 return -ECONNABORTED;
1911 } /* end rxrpc_call_read_data() */
1913 /*****************************************************************************/
1915 * write data to a call
1916 * - the data may not be sent immediately if it doesn't fill a buffer
1917 * - if we can't queue all the data for buffering now, siov[] will have been
1918 * adjusted to take account of what has been sent
1920 int rxrpc_call_write_data(struct rxrpc_call *call,
1922 struct iovec siov[],
1928 struct rxrpc_message *msg;
1930 size_t space, size, chunk, tmp;
1934 _enter("%p,%Zu,%p,%02x,%x,%d,%p",
1935 call, sioc, siov, rxhdr_flags, alloc_flags, dup_data,
1942 /* can't send more if we've sent last packet from this end */
1943 switch (call->app_call_state) {
1944 case RXRPC_CSTATE_SRVR_SND_REPLY:
1945 case RXRPC_CSTATE_CLNT_SND_ARGS:
1947 case RXRPC_CSTATE_ERROR:
1948 ret = call->app_errno;
1953 /* calculate how much data we've been given */
1955 for (; sioc > 0; sptr++, sioc--) {
1959 if (!sptr->iov_base)
1962 size += sptr->iov_len;
1965 _debug("- size=%Zu mtu=%Zu", size, call->conn->mtu_size);
1968 /* make sure there's a message under construction */
1969 if (!call->snd_nextmsg) {
1970 /* no - allocate a message with no data yet attached */
1971 ret = rxrpc_conn_newmsg(call->conn, call,
1972 RXRPC_PACKET_TYPE_DATA,
1973 0, NULL, alloc_flags,
1974 &call->snd_nextmsg);
1977 _debug("- allocated new message [ds=%Zu]",
1978 call->snd_nextmsg->dsize);
1981 msg = call->snd_nextmsg;
1982 msg->hdr.flags |= rxhdr_flags;
1984 /* deal with zero-length terminal packet */
1986 if (rxhdr_flags & RXRPC_LAST_PACKET) {
1987 ret = rxrpc_call_flush(call);
1994 /* work out how much space current packet has available */
1995 space = call->conn->mtu_size - msg->dsize;
1996 chunk = min(space, size);
1998 _debug("- [before] space=%Zu chunk=%Zu", space, chunk);
2000 while (!siov->iov_len)
2003 /* if we are going to have to duplicate the data then coalesce
2006 /* don't allocate more that 1 page at a time */
2007 if (chunk > PAGE_SIZE)
2010 /* allocate a data buffer and attach to the message */
2011 buf = kmalloc(chunk, alloc_flags);
2012 if (unlikely(!buf)) {
2014 sizeof(struct rxrpc_header)) {
2015 /* discard an empty msg and wind back
2016 * the seq counter */
2017 rxrpc_put_message(msg);
2018 call->snd_nextmsg = NULL;
2019 call->snd_seq_count--;
2026 tmp = msg->dcount++;
2027 set_bit(tmp, &msg->dfree);
2028 msg->data[tmp].iov_base = buf;
2029 msg->data[tmp].iov_len = chunk;
2030 msg->dsize += chunk;
2031 *size_sent += chunk;
2034 /* load the buffer with data */
2036 tmp = min(chunk, siov->iov_len);
2037 memcpy(buf, siov->iov_base, tmp);
2039 siov->iov_base += tmp;
2040 siov->iov_len -= tmp;
2047 /* we want to attach the supplied buffers directly */
2049 msg->dcount < RXRPC_MSG_MAX_IOCS) {
2050 tmp = msg->dcount++;
2051 msg->data[tmp].iov_base = siov->iov_base;
2052 msg->data[tmp].iov_len = siov->iov_len;
2053 msg->dsize += siov->iov_len;
2054 *size_sent += siov->iov_len;
2055 size -= siov->iov_len;
2056 chunk -= siov->iov_len;
2061 _debug("- [loaded] chunk=%Zu size=%Zu", chunk, size);
2063 /* dispatch the message when full, final or requesting ACK */
2064 if (msg->dsize >= call->conn->mtu_size || rxhdr_flags) {
2065 ret = rxrpc_call_flush(call);
2074 _leave(" = %d (%Zd queued, %Zd rem)", ret, *size_sent, size);
2077 } /* end rxrpc_call_write_data() */
2079 /*****************************************************************************/
2081 * flush outstanding packets to the network
2083 int rxrpc_call_flush(struct rxrpc_call *call)
2085 struct rxrpc_message *msg;
2090 rxrpc_get_call(call);
2092 /* if there's a packet under construction, then dispatch it now */
2093 if (call->snd_nextmsg) {
2094 msg = call->snd_nextmsg;
2095 call->snd_nextmsg = NULL;
2097 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2098 msg->hdr.flags &= ~RXRPC_MORE_PACKETS;
2099 if (call->app_call_state != RXRPC_CSTATE_CLNT_SND_ARGS)
2100 msg->hdr.flags |= RXRPC_REQUEST_ACK;
2103 msg->hdr.flags |= RXRPC_MORE_PACKETS;
2106 _proto("Sending DATA message { ds=%Zu dc=%u df=%02lu }",
2107 msg->dsize, msg->dcount, msg->dfree);
2109 /* queue and adjust call state */
2110 spin_lock(&call->lock);
2111 list_add_tail(&msg->link, &call->acks_pendq);
2113 /* decide what to do depending on current state and if this is
2114 * the last packet */
2116 switch (call->app_call_state) {
2117 case RXRPC_CSTATE_SRVR_SND_REPLY:
2118 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2119 call->app_call_state =
2120 RXRPC_CSTATE_SRVR_RCV_FINAL_ACK;
2125 case RXRPC_CSTATE_CLNT_SND_ARGS:
2126 if (msg->hdr.flags & RXRPC_LAST_PACKET) {
2127 call->app_call_state =
2128 RXRPC_CSTATE_CLNT_RCV_REPLY;
2133 case RXRPC_CSTATE_ERROR:
2134 ret = call->app_errno;
2136 spin_unlock(&call->lock);
2140 call->acks_pend_cnt++;
2142 mod_timer(&call->acks_timeout,
2143 __rxrpc_rtt_based_timeout(call,
2144 rxrpc_call_acks_timeout));
2146 spin_unlock(&call->lock);
2148 ret = rxrpc_conn_sendmsg(call->conn, msg);
2150 call->pkt_snd_count++;
2154 rxrpc_put_call(call);
2156 _leave(" = %d", ret);
2159 } /* end rxrpc_call_flush() */
2161 /*****************************************************************************/
2163 * resend NAK'd or unacknowledged packets up to the highest one specified
2165 static void rxrpc_call_resend(struct rxrpc_call *call, rxrpc_seq_t highest)
2167 struct rxrpc_message *msg;
2168 struct list_head *_p;
2169 rxrpc_seq_t seq = 0;
2171 _enter("%p,%u", call, highest);
2173 _proto("Rx Resend required");
2175 /* handle too many resends */
2176 if (call->snd_resend_cnt >= rxrpc_call_max_resend) {
2177 _debug("Aborting due to too many resends (rcv=%d)",
2178 call->pkt_rcv_count);
2179 rxrpc_call_abort(call,
2180 call->pkt_rcv_count > 0 ? -EIO : -ETIMEDOUT);
2185 spin_lock(&call->lock);
2186 call->snd_resend_cnt++;
2188 /* determine which the next packet we might need to ACK is */
2189 if (seq <= call->acks_dftv_seq)
2190 seq = call->acks_dftv_seq;
2196 /* look for the packet in the pending-ACK queue */
2197 list_for_each(_p, &call->acks_pendq) {
2198 msg = list_entry(_p, struct rxrpc_message, link);
2199 if (msg->seq == seq)
2204 " Inconsistent pending-ACK queue (ds=%u sc=%u sq=%u)\n",
2205 __FUNCTION__, call, highest,
2206 call->acks_dftv_seq, call->snd_seq_count, seq);
2209 if (msg->state != RXRPC_MSG_SENT)
2210 continue; /* only un-ACK'd packets */
2212 rxrpc_get_message(msg);
2213 spin_unlock(&call->lock);
2215 /* send each message again (and ignore any errors we might
2217 _proto("Resending DATA message { ds=%Zu dc=%u df=%02lu }",
2218 msg->dsize, msg->dcount, msg->dfree);
2220 if (rxrpc_conn_sendmsg(call->conn, msg) == 0)
2221 call->pkt_snd_count++;
2223 rxrpc_put_message(msg);
2225 spin_lock(&call->lock);
2228 /* reset the timeout */
2229 mod_timer(&call->acks_timeout,
2230 __rxrpc_rtt_based_timeout(call, rxrpc_call_acks_timeout));
2232 spin_unlock(&call->lock);
2235 } /* end rxrpc_call_resend() */
2237 /*****************************************************************************/
2239 * handle an ICMP error being applied to a call
2241 void rxrpc_call_handle_error(struct rxrpc_call *call, int local, int errno)
2243 _enter("%p{%u},%d", call, ntohl(call->call_id), errno);
2245 /* if this call is already aborted, then just wake up any waiters */
2246 if (call->app_call_state == RXRPC_CSTATE_ERROR) {
2247 call->app_error_func(call);
2250 /* tell the app layer what happened */
2251 spin_lock(&call->lock);
2252 call->app_call_state = RXRPC_CSTATE_ERROR;
2255 call->app_err_state = RXRPC_ESTATE_LOCAL_ERROR;
2257 call->app_err_state = RXRPC_ESTATE_REMOTE_ERROR;
2258 call->app_errno = errno;
2259 call->app_mark = RXRPC_APP_MARK_EOF;
2260 call->app_read_buf = NULL;
2261 call->app_async_read = 0;
2264 call->app_aemap_func(call);
2266 del_timer_sync(&call->acks_timeout);
2267 del_timer_sync(&call->rcv_timeout);
2268 del_timer_sync(&call->ackr_dfr_timo);
2270 spin_unlock(&call->lock);
2272 call->app_error_func(call);
2276 } /* end rxrpc_call_handle_error() */