📄 ar-ack.c
字号:
skb->mark = RXRPC_SKB_MARK_DATA; terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) && !(sp->hdr.flags & RXRPC_CLIENT_INITIATED)); ret = rxrpc_queue_rcv_skb(call, skb, true, terminal); BUG_ON(ret < 0); _debug("drain #%u", call->rx_data_post); call->rx_data_post++; /* find out what the next packet is */ skb = skb_peek(&call->rx_oos_queue); if (skb) call->rx_first_oos = ntohl(rxrpc_skb(skb)->hdr.seq); else call->rx_first_oos = 0; _debug("peek %p {%u}", skb, call->rx_first_oos); } } ret = 0;socket_unavailable: spin_unlock_bh(&call->lock); _leave(" = %d", ret); return ret;}/* * insert an out of sequence packet into the buffer */static void rxrpc_insert_oos_packet(struct rxrpc_call *call, struct sk_buff *skb){ struct rxrpc_skb_priv *sp, *psp; struct sk_buff *p; u32 seq; sp = rxrpc_skb(skb); seq = ntohl(sp->hdr.seq); _enter(",,{%u}", seq); skb->destructor = rxrpc_packet_destructor; ASSERTCMP(sp->call, ==, NULL); sp->call = call; rxrpc_get_call(call); /* insert into the buffer in sequence order */ spin_lock_bh(&call->lock); skb_queue_walk(&call->rx_oos_queue, p) { psp = rxrpc_skb(p); if (ntohl(psp->hdr.seq) > seq) { _debug("insert oos #%u before #%u", seq, ntohl(psp->hdr.seq)); skb_insert(p, skb, &call->rx_oos_queue); goto inserted; } } _debug("append oos #%u", seq); skb_queue_tail(&call->rx_oos_queue, skb);inserted: /* we might now have a new front to the queue */ if (call->rx_first_oos == 0 || seq < call->rx_first_oos) call->rx_first_oos = seq; read_lock(&call->state_lock); if (call->state < RXRPC_CALL_COMPLETE && call->rx_data_post == call->rx_first_oos) { _debug("drain rx oos now"); set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events); } read_unlock(&call->state_lock); spin_unlock_bh(&call->lock); _leave(" [stored #%u]", call->rx_first_oos);}/* * clear the Tx window on final ACK reception */static void rxrpc_zap_tx_window(struct rxrpc_call *call){ struct rxrpc_skb_priv *sp; struct sk_buff *skb; unsigned long _skb, *acks_window; uint8_t winsz = call->acks_winsz; int tail; acks_window = call->acks_window; call->acks_window = NULL; while (CIRC_CNT(call->acks_head, call->acks_tail, winsz) > 0) { tail = call->acks_tail; smp_read_barrier_depends(); _skb = acks_window[tail] & ~1; smp_mb(); call->acks_tail = (call->acks_tail + 1) & (winsz - 1); skb = (struct sk_buff *) _skb; sp = rxrpc_skb(skb); _debug("+++ clear Tx %u", ntohl(sp->hdr.seq)); rxrpc_free_skb(skb); } kfree(acks_window);}/* * process the extra information that may be appended to an ACK packet */static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb, unsigned latest, int nAcks){ struct rxrpc_ackinfo ackinfo; struct rxrpc_peer *peer; unsigned mtu; if (skb_copy_bits(skb, nAcks + 3, &ackinfo, sizeof(ackinfo)) < 0) { _leave(" [no ackinfo]"); return; } _proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }", latest, ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU), ntohl(ackinfo.rwind), ntohl(ackinfo.jumbo_max)); mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU)); peer = call->conn->trans->peer; if (mtu < peer->maxdata) { spin_lock_bh(&peer->lock); peer->maxdata = mtu; peer->mtu = mtu + peer->hdrsize; spin_unlock_bh(&peer->lock); _net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata); }}/* * process packets in the reception queue */static int rxrpc_process_rx_queue(struct rxrpc_call *call, u32 *_abort_code){ struct rxrpc_ackpacket ack; struct rxrpc_skb_priv *sp; struct sk_buff *skb; bool post_ACK; int latest; u32 hard, tx; _enter("");process_further: skb = skb_dequeue(&call->rx_queue); if (!skb) return -EAGAIN; _net("deferred skb %p", skb); sp = rxrpc_skb(skb); _debug("process %s [st %d]", rxrpc_pkts[sp->hdr.type], call->state); post_ACK = false; switch (sp->hdr.type) { /* data packets that wind up here have been received out of * order, need security processing or are jumbo packets */ case RXRPC_PACKET_TYPE_DATA: _proto("OOSQ DATA %%%u { #%u }", ntohl(sp->hdr.serial), ntohl(sp->hdr.seq)); /* secured packets must be verified and possibly decrypted */ if (rxrpc_verify_packet(call, skb, _abort_code) < 0) goto protocol_error; rxrpc_insert_oos_packet(call, skb); goto process_further; /* partial ACK to process */ case RXRPC_PACKET_TYPE_ACK: if (skb_copy_bits(skb, 0, &ack, sizeof(ack)) < 0) { _debug("extraction failure"); goto protocol_error; } if (!skb_pull(skb, sizeof(ack))) BUG(); latest = ntohl(sp->hdr.serial); hard = ntohl(ack.firstPacket); tx = atomic_read(&call->sequence); _proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", latest, ntohs(ack.maxSkew), hard, ntohl(ack.previousPacket), ntohl(ack.serial), rxrpc_acks[ack.reason], ack.nAcks); rxrpc_extract_ackinfo(call, skb, latest, ack.nAcks); if (ack.reason == RXRPC_ACK_PING) { _proto("Rx ACK %%%u PING Request", latest); rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE, sp->hdr.serial, true); } /* discard any out-of-order or duplicate ACKs */ if (latest - call->acks_latest <= 0) { _debug("discard ACK %d <= %d", latest, call->acks_latest); goto discard; } call->acks_latest = latest; if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST && call->state != RXRPC_CALL_CLIENT_AWAIT_REPLY && call->state != RXRPC_CALL_SERVER_SEND_REPLY && call->state != RXRPC_CALL_SERVER_AWAIT_ACK) goto discard; _debug("Tx=%d H=%u S=%d", tx, call->acks_hard, call->state); if (hard > 0) { if (hard - 1 > tx) { _debug("hard-ACK'd packet %d not transmitted" " (%d top)", hard - 1, tx); goto protocol_error; } if ((call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY || call->state == RXRPC_CALL_SERVER_AWAIT_ACK) && hard > tx) goto all_acked; smp_rmb(); rxrpc_rotate_tx_window(call, hard - 1); } if (ack.nAcks > 0) { if (hard - 1 + ack.nAcks > tx) { _debug("soft-ACK'd packet %d+%d not" " transmitted (%d top)", hard - 1, ack.nAcks, tx); goto protocol_error; } if (rxrpc_process_soft_ACKs(call, &ack, skb) < 0) goto protocol_error; } goto discard; /* complete ACK to process */ case RXRPC_PACKET_TYPE_ACKALL: goto all_acked; /* abort and busy are handled elsewhere */ case RXRPC_PACKET_TYPE_BUSY: case RXRPC_PACKET_TYPE_ABORT: BUG(); /* connection level events - also handled elsewhere */ case RXRPC_PACKET_TYPE_CHALLENGE: case RXRPC_PACKET_TYPE_RESPONSE: case RXRPC_PACKET_TYPE_DEBUG: BUG(); } /* if we've had a hard ACK that covers all the packets we've sent, then * that ends that phase of the operation */all_acked: write_lock_bh(&call->state_lock); _debug("ack all %d", call->state); switch (call->state) { case RXRPC_CALL_CLIENT_AWAIT_REPLY: call->state = RXRPC_CALL_CLIENT_RECV_REPLY; break; case RXRPC_CALL_SERVER_AWAIT_ACK: _debug("srv complete"); call->state = RXRPC_CALL_COMPLETE; post_ACK = true; break; case RXRPC_CALL_CLIENT_SEND_REQUEST: case RXRPC_CALL_SERVER_RECV_REQUEST: goto protocol_error_unlock; /* can't occur yet */ default: write_unlock_bh(&call->state_lock); goto discard; /* assume packet left over from earlier phase */ } write_unlock_bh(&call->state_lock); /* if all the packets we sent are hard-ACK'd, then we can discard * whatever we've got left */ _debug("clear Tx %d", CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz)); del_timer_sync(&call->resend_timer); clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events); if (call->acks_window) rxrpc_zap_tx_window(call); if (post_ACK) { /* post the final ACK message for userspace to pick up */ _debug("post ACK"); skb->mark = RXRPC_SKB_MARK_FINAL_ACK; sp->call = call; rxrpc_get_call(call); spin_lock_bh(&call->lock); if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0) BUG(); spin_unlock_bh(&call->lock); goto process_further; }discard: rxrpc_free_skb(skb); goto process_further;protocol_error_unlock: write_unlock_bh(&call->state_lock);protocol_error: rxrpc_free_skb(skb); _leave(" = -EPROTO"); return -EPROTO;}/* * post a message to the socket Rx queue for recvmsg() to pick up */static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error, bool fatal){ struct rxrpc_skb_priv *sp; struct sk_buff *skb; int ret; _enter("{%d,%lx},%u,%u,%d", call->debug_id, call->flags, mark, error, fatal); /* remove timers and things for fatal messages */ if (fatal) { del_timer_sync(&call->resend_timer); del_timer_sync(&call->ack_timer); clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); } if (mark != RXRPC_SKB_MARK_NEW_CALL && !test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { _leave("[no userid]"); return 0; } if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) { skb = alloc_skb(0, GFP_NOFS); if (!skb) return -ENOMEM; rxrpc_new_skb(skb); skb->mark = mark; sp = rxrpc_skb(skb); memset(sp, 0, sizeof(*sp)); sp->error = error; sp->call = call; rxrpc_get_call(call); spin_lock_bh(&call->lock); ret = rxrpc_queue_rcv_skb(call, skb, true, fatal); spin_unlock_bh(&call->lock); if (ret < 0) BUG(); } return 0;}/* * handle background processing of incoming call packets and ACK / abort * generation */void rxrpc_process_call(struct work_struct *work){ struct rxrpc_call *call = container_of(work, struct rxrpc_call, processor); struct rxrpc_ackpacket ack; struct rxrpc_ackinfo ackinfo; struct rxrpc_header hdr; struct msghdr msg; struct kvec iov[5]; unsigned long bits; __be32 data, pad; size_t len; int genbit, loop, nbit, ioc, ret, mtu; u32 abort_code = RX_PROTOCOL_ERROR; u8 *acks = NULL; //printk("\n--------------------\n"); _enter("{%d,%s,%lx} [%lu]", call->debug_id, rxrpc_call_states[call->state], call->events, (jiffies - call->creation_jif) / (HZ / 10)); if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) { _debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX"); return; } /* there's a good chance we're going to have to send a message, so set * one up in advance */ msg.msg_name = &call->conn->trans->peer->srx.transport.sin; msg.msg_namelen = sizeof(call->conn->trans->peer->srx.transport.sin); msg.msg_control = NULL; msg.msg_controllen = 0; msg.msg_flags = 0; hdr.epoch = call->conn->epoch; hdr.cid = call->cid; hdr.callNumber = call->call_id; hdr.seq = 0; hdr.type = RXRPC_PACKET_TYPE_ACK; hdr.flags = call->conn->out_clientflag; hdr.userStatus = 0; hdr.securityIndex = call->conn->security_ix; hdr._rsvd = 0; hdr.serviceId = call->conn->service_id;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -