📄 ar-ack.c
字号:
/* Management of Tx window, Tx resend, ACKs and out-of-sequence reception * * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. * Written by David Howells (dhowells@redhat.com) * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version * 2 of the License, or (at your option) any later version. */#include <linux/module.h>#include <linux/circ_buf.h>#include <linux/net.h>#include <linux/skbuff.h>#include <linux/udp.h>#include <net/sock.h>#include <net/af_rxrpc.h>#include "ar-internal.h"static unsigned rxrpc_ack_defer = 1;static const char *rxrpc_acks[] = { "---", "REQ", "DUP", "OOS", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL", "-?-"};static const s8 rxrpc_ack_priority[] = { [0] = 0, [RXRPC_ACK_DELAY] = 1, [RXRPC_ACK_REQUESTED] = 2, [RXRPC_ACK_IDLE] = 3, [RXRPC_ACK_PING_RESPONSE] = 4, [RXRPC_ACK_DUPLICATE] = 5, [RXRPC_ACK_OUT_OF_SEQUENCE] = 6, [RXRPC_ACK_EXCEEDS_WINDOW] = 7, [RXRPC_ACK_NOSPACE] = 8,};/* * propose an ACK be sent */void __rxrpc_propose_ACK(struct rxrpc_call *call, uint8_t ack_reason, __be32 serial, bool immediate){ unsigned long expiry; s8 prior = rxrpc_ack_priority[ack_reason]; ASSERTCMP(prior, >, 0); _enter("{%d},%s,%%%x,%u", call->debug_id, rxrpc_acks[ack_reason], ntohl(serial), immediate); if (prior < rxrpc_ack_priority[call->ackr_reason]) { if (immediate) goto cancel_timer; return; } /* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial * numbers */ if (prior == rxrpc_ack_priority[call->ackr_reason]) { if (prior <= 4) call->ackr_serial = serial; if (immediate) goto cancel_timer; return; } call->ackr_reason = ack_reason; call->ackr_serial = serial; switch (ack_reason) { case RXRPC_ACK_DELAY: _debug("run delay timer"); call->ack_timer.expires = jiffies + rxrpc_ack_timeout * HZ; add_timer(&call->ack_timer); return; case RXRPC_ACK_IDLE: if (!immediate) { _debug("run defer timer"); expiry = 1; goto run_timer; } goto cancel_timer; case RXRPC_ACK_REQUESTED: if (!rxrpc_ack_defer) goto cancel_timer; if (!immediate || serial == cpu_to_be32(1)) { _debug("run defer timer"); expiry = rxrpc_ack_defer; goto run_timer; } default: _debug("immediate ACK"); goto cancel_timer; }run_timer: expiry += jiffies; if (!timer_pending(&call->ack_timer) || time_after(call->ack_timer.expires, expiry)) mod_timer(&call->ack_timer, expiry); return;cancel_timer: _debug("cancel timer %%%u", ntohl(serial)); try_to_del_timer_sync(&call->ack_timer); read_lock_bh(&call->state_lock); if (call->state <= RXRPC_CALL_COMPLETE && !test_and_set_bit(RXRPC_CALL_ACK, &call->events)) rxrpc_queue_call(call); read_unlock_bh(&call->state_lock);}/* * propose an ACK be sent, locking the call structure */void rxrpc_propose_ACK(struct rxrpc_call *call, uint8_t ack_reason, __be32 serial, bool immediate){ s8 prior = rxrpc_ack_priority[ack_reason]; if (prior > rxrpc_ack_priority[call->ackr_reason]) { spin_lock_bh(&call->lock); __rxrpc_propose_ACK(call, ack_reason, serial, immediate); spin_unlock_bh(&call->lock); }}/* * set the resend timer */static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend, unsigned long resend_at){ read_lock_bh(&call->state_lock); if (call->state >= RXRPC_CALL_COMPLETE) resend = 0; if (resend & 1) { _debug("SET RESEND"); set_bit(RXRPC_CALL_RESEND, &call->events); } if (resend & 2) { _debug("MODIFY RESEND TIMER"); set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); mod_timer(&call->resend_timer, resend_at); } else { _debug("KILL RESEND TIMER"); del_timer_sync(&call->resend_timer); clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events); clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); } read_unlock_bh(&call->state_lock);}/* * resend packets */static void rxrpc_resend(struct rxrpc_call *call){ struct rxrpc_skb_priv *sp; struct rxrpc_header *hdr; struct sk_buff *txb; unsigned long *p_txb, resend_at; int loop, stop; u8 resend; _enter("{%d,%d,%d,%d},", call->acks_hard, call->acks_unacked, atomic_read(&call->sequence), CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz)); stop = 0; resend = 0; resend_at = 0; for (loop = call->acks_tail; loop != call->acks_head || stop; loop = (loop + 1) & (call->acks_winsz - 1) ) { p_txb = call->acks_window + loop; smp_read_barrier_depends(); if (*p_txb & 1) continue; txb = (struct sk_buff *) *p_txb; sp = rxrpc_skb(txb); if (sp->need_resend) { sp->need_resend = 0; /* each Tx packet has a new serial number */ sp->hdr.serial = htonl(atomic_inc_return(&call->conn->serial)); hdr = (struct rxrpc_header *) txb->head; hdr->serial = sp->hdr.serial; _proto("Tx DATA %%%u { #%d }", ntohl(sp->hdr.serial), ntohl(sp->hdr.seq)); if (rxrpc_send_packet(call->conn->trans, txb) < 0) { stop = 0; sp->resend_at = jiffies + 3; } else { sp->resend_at = jiffies + rxrpc_resend_timeout * HZ; } } if (time_after_eq(jiffies + 1, sp->resend_at)) { sp->need_resend = 1; resend |= 1; } else if (resend & 2) { if (time_before(sp->resend_at, resend_at)) resend_at = sp->resend_at; } else { resend_at = sp->resend_at; resend |= 2; } } rxrpc_set_resend(call, resend, resend_at); _leave("");}/* * handle resend timer expiry */static void rxrpc_resend_timer(struct rxrpc_call *call){ struct rxrpc_skb_priv *sp; struct sk_buff *txb; unsigned long *p_txb, resend_at; int loop; u8 resend; _enter("%d,%d,%d", call->acks_tail, call->acks_unacked, call->acks_head); resend = 0; resend_at = 0; for (loop = call->acks_unacked; loop != call->acks_head; loop = (loop + 1) & (call->acks_winsz - 1) ) { p_txb = call->acks_window + loop; smp_read_barrier_depends(); txb = (struct sk_buff *) (*p_txb & ~1); sp = rxrpc_skb(txb); ASSERT(!(*p_txb & 1)); if (sp->need_resend) { ; } else if (time_after_eq(jiffies + 1, sp->resend_at)) { sp->need_resend = 1; resend |= 1; } else if (resend & 2) { if (time_before(sp->resend_at, resend_at)) resend_at = sp->resend_at; } else { resend_at = sp->resend_at; resend |= 2; } } rxrpc_set_resend(call, resend, resend_at); _leave("");}/* * process soft ACKs of our transmitted packets * - these indicate packets the peer has or has not received, but hasn't yet * given to the consumer, and so can still be discarded and re-requested */static int rxrpc_process_soft_ACKs(struct rxrpc_call *call, struct rxrpc_ackpacket *ack, struct sk_buff *skb){ struct rxrpc_skb_priv *sp; struct sk_buff *txb; unsigned long *p_txb, resend_at; int loop; u8 sacks[RXRPC_MAXACKS], resend; _enter("{%d,%d},{%d},", call->acks_hard, CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz), ack->nAcks); if (skb_copy_bits(skb, 0, sacks, ack->nAcks) < 0) goto protocol_error; resend = 0; resend_at = 0; for (loop = 0; loop < ack->nAcks; loop++) { p_txb = call->acks_window; p_txb += (call->acks_tail + loop) & (call->acks_winsz - 1); smp_read_barrier_depends(); txb = (struct sk_buff *) (*p_txb & ~1); sp = rxrpc_skb(txb); switch (sacks[loop]) { case RXRPC_ACK_TYPE_ACK: sp->need_resend = 0; *p_txb |= 1; break; case RXRPC_ACK_TYPE_NACK: sp->need_resend = 1; *p_txb &= ~1; resend = 1; break; default: _debug("Unsupported ACK type %d", sacks[loop]); goto protocol_error; } } smp_mb(); call->acks_unacked = (call->acks_tail + loop) & (call->acks_winsz - 1); /* anything not explicitly ACK'd is implicitly NACK'd, but may just not * have been received or processed yet by the far end */ for (loop = call->acks_unacked; loop != call->acks_head; loop = (loop + 1) & (call->acks_winsz - 1) ) { p_txb = call->acks_window + loop; smp_read_barrier_depends(); txb = (struct sk_buff *) (*p_txb & ~1); sp = rxrpc_skb(txb); if (*p_txb & 1) { /* packet must have been discarded */ sp->need_resend = 1; *p_txb &= ~1; resend |= 1; } else if (sp->need_resend) { ; } else if (time_after_eq(jiffies + 1, sp->resend_at)) { sp->need_resend = 1; resend |= 1; } else if (resend & 2) { if (time_before(sp->resend_at, resend_at)) resend_at = sp->resend_at; } else { resend_at = sp->resend_at; resend |= 2; } } rxrpc_set_resend(call, resend, resend_at); _leave(" = 0"); return 0;protocol_error: _leave(" = -EPROTO"); return -EPROTO;}/* * discard hard-ACK'd packets from the Tx window */static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard){ struct rxrpc_skb_priv *sp; unsigned long _skb; int tail = call->acks_tail, old_tail; int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz); _enter("{%u,%u},%u", call->acks_hard, win, hard); ASSERTCMP(hard - call->acks_hard, <=, win); while (call->acks_hard < hard) { smp_read_barrier_depends(); _skb = call->acks_window[tail] & ~1; sp = rxrpc_skb((struct sk_buff *) _skb); rxrpc_free_skb((struct sk_buff *) _skb); old_tail = tail; tail = (tail + 1) & (call->acks_winsz - 1); call->acks_tail = tail; if (call->acks_unacked == old_tail) call->acks_unacked = tail; call->acks_hard++; } wake_up(&call->tx_waitq);}/* * clear the Tx window in the event of a failure */static void rxrpc_clear_tx_window(struct rxrpc_call *call){ rxrpc_rotate_tx_window(call, atomic_read(&call->sequence));}/* * drain the out of sequence received packet queue into the packet Rx queue */static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call){ struct rxrpc_skb_priv *sp; struct sk_buff *skb; bool terminal; int ret; _enter("{%d,%d}", call->rx_data_post, call->rx_first_oos); spin_lock_bh(&call->lock); ret = -ECONNRESET; if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) goto socket_unavailable; skb = skb_dequeue(&call->rx_oos_queue); if (skb) { sp = rxrpc_skb(skb); _debug("drain OOS packet %d [%d]", ntohl(sp->hdr.seq), call->rx_first_oos); if (ntohl(sp->hdr.seq) != call->rx_first_oos) { skb_queue_head(&call->rx_oos_queue, skb); call->rx_first_oos = ntohl(rxrpc_skb(skb)->hdr.seq); _debug("requeue %p {%u}", skb, call->rx_first_oos); } else {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -