📄 ar-ack.c
字号:
memset(iov, 0, sizeof(iov)); iov[0].iov_base = &hdr; iov[0].iov_len = sizeof(hdr); /* deal with events of a final nature */ if (test_bit(RXRPC_CALL_RELEASE, &call->events)) { rxrpc_release_call(call); clear_bit(RXRPC_CALL_RELEASE, &call->events); } if (test_bit(RXRPC_CALL_RCVD_ERROR, &call->events)) { int error; clear_bit(RXRPC_CALL_CONN_ABORT, &call->events); clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events); clear_bit(RXRPC_CALL_ABORT, &call->events); error = call->conn->trans->peer->net_error; _debug("post net error %d", error); if (rxrpc_post_message(call, RXRPC_SKB_MARK_NET_ERROR, error, true) < 0) goto no_mem; clear_bit(RXRPC_CALL_RCVD_ERROR, &call->events); goto kill_ACKs; } if (test_bit(RXRPC_CALL_CONN_ABORT, &call->events)) { ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE); clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events); clear_bit(RXRPC_CALL_ABORT, &call->events); _debug("post conn abort"); if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR, call->conn->error, true) < 0) goto no_mem; clear_bit(RXRPC_CALL_CONN_ABORT, &call->events); goto kill_ACKs; } if (test_bit(RXRPC_CALL_REJECT_BUSY, &call->events)) { hdr.type = RXRPC_PACKET_TYPE_BUSY; genbit = RXRPC_CALL_REJECT_BUSY; goto send_message; } if (test_bit(RXRPC_CALL_ABORT, &call->events)) { ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE); if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR, ECONNABORTED, true) < 0) goto no_mem; hdr.type = RXRPC_PACKET_TYPE_ABORT; data = htonl(call->abort_code); iov[1].iov_base = &data; iov[1].iov_len = sizeof(data); genbit = RXRPC_CALL_ABORT; goto send_message; } if (test_bit(RXRPC_CALL_ACK_FINAL, &call->events)) { genbit = RXRPC_CALL_ACK_FINAL; ack.bufferSpace = htons(8); ack.maxSkew = 0; ack.serial = 0; ack.reason = RXRPC_ACK_IDLE; ack.nAcks = 0; call->ackr_reason = 0; spin_lock_bh(&call->lock); ack.serial = call->ackr_serial; ack.previousPacket = call->ackr_prev_seq; ack.firstPacket = htonl(call->rx_data_eaten + 1); spin_unlock_bh(&call->lock); pad = 0; iov[1].iov_base = &ack; iov[1].iov_len = sizeof(ack); iov[2].iov_base = &pad; iov[2].iov_len = 3; iov[3].iov_base = &ackinfo; iov[3].iov_len = sizeof(ackinfo); goto send_ACK; } if (call->events & ((1 << RXRPC_CALL_RCVD_BUSY) | (1 << RXRPC_CALL_RCVD_ABORT)) ) { u32 mark; if (test_bit(RXRPC_CALL_RCVD_ABORT, &call->events)) mark = RXRPC_SKB_MARK_REMOTE_ABORT; else mark = RXRPC_SKB_MARK_BUSY; _debug("post abort/busy"); rxrpc_clear_tx_window(call); if (rxrpc_post_message(call, mark, ECONNABORTED, true) < 0) goto no_mem; clear_bit(RXRPC_CALL_RCVD_BUSY, &call->events); clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events); goto kill_ACKs; } if (test_and_clear_bit(RXRPC_CALL_RCVD_ACKALL, &call->events)) { _debug("do implicit ackall"); rxrpc_clear_tx_window(call); } if (test_bit(RXRPC_CALL_LIFE_TIMER, &call->events)) { write_lock_bh(&call->state_lock); if (call->state <= RXRPC_CALL_COMPLETE) { call->state = RXRPC_CALL_LOCALLY_ABORTED; call->abort_code = RX_CALL_TIMEOUT; set_bit(RXRPC_CALL_ABORT, &call->events); } write_unlock_bh(&call->state_lock); _debug("post timeout"); if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR, ETIME, true) < 0) goto no_mem; clear_bit(RXRPC_CALL_LIFE_TIMER, &call->events); goto kill_ACKs; } /* deal with assorted inbound messages */ if (!skb_queue_empty(&call->rx_queue)) { switch (rxrpc_process_rx_queue(call, &abort_code)) { case 0: case -EAGAIN: break; case -ENOMEM: goto no_mem; case -EKEYEXPIRED: case -EKEYREJECTED: case -EPROTO: rxrpc_abort_call(call, abort_code); goto kill_ACKs; } } /* handle resending */ if (test_and_clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events)) rxrpc_resend_timer(call); if (test_and_clear_bit(RXRPC_CALL_RESEND, &call->events)) rxrpc_resend(call); /* consider sending an ordinary ACK */ if (test_bit(RXRPC_CALL_ACK, &call->events)) { _debug("send ACK: window: %d - %d { %lx }", call->rx_data_eaten, call->ackr_win_top, call->ackr_window[0]); if (call->state > RXRPC_CALL_SERVER_ACK_REQUEST && call->ackr_reason != RXRPC_ACK_PING_RESPONSE) { /* ACK by sending reply DATA packet in this state */ clear_bit(RXRPC_CALL_ACK, &call->events); goto maybe_reschedule; } genbit = RXRPC_CALL_ACK; acks = kzalloc(call->ackr_win_top - call->rx_data_eaten, GFP_NOFS); if (!acks) goto no_mem; //hdr.flags = RXRPC_SLOW_START_OK; ack.bufferSpace = htons(8); ack.maxSkew = 0; ack.serial = 0; ack.reason = 0; spin_lock_bh(&call->lock); ack.reason = call->ackr_reason; ack.serial = call->ackr_serial; ack.previousPacket = call->ackr_prev_seq; ack.firstPacket = htonl(call->rx_data_eaten + 1); ack.nAcks = 0; for (loop = 0; loop < RXRPC_ACKR_WINDOW_ASZ; loop++) { nbit = loop * BITS_PER_LONG; for (bits = call->ackr_window[loop]; bits; bits >>= 1 ) { _debug("- l=%d n=%d b=%lx", loop, nbit, bits); if (bits & 1) { acks[nbit] = RXRPC_ACK_TYPE_ACK; ack.nAcks = nbit + 1; } nbit++; } } call->ackr_reason = 0; spin_unlock_bh(&call->lock); pad = 0; iov[1].iov_base = &ack; iov[1].iov_len = sizeof(ack); iov[2].iov_base = acks; iov[2].iov_len = ack.nAcks; iov[3].iov_base = &pad; iov[3].iov_len = 3; iov[4].iov_base = &ackinfo; iov[4].iov_len = sizeof(ackinfo); switch (ack.reason) { case RXRPC_ACK_REQUESTED: case RXRPC_ACK_DUPLICATE: case RXRPC_ACK_OUT_OF_SEQUENCE: case RXRPC_ACK_EXCEEDS_WINDOW: case RXRPC_ACK_NOSPACE: case RXRPC_ACK_PING: case RXRPC_ACK_PING_RESPONSE: goto send_ACK_with_skew; case RXRPC_ACK_DELAY: case RXRPC_ACK_IDLE: goto send_ACK; } } /* handle completion of security negotiations on an incoming * connection */ if (test_and_clear_bit(RXRPC_CALL_SECURED, &call->events)) { _debug("secured"); spin_lock_bh(&call->lock); if (call->state == RXRPC_CALL_SERVER_SECURING) { _debug("securing"); write_lock(&call->conn->lock); if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && !test_bit(RXRPC_CALL_RELEASE, &call->events)) { _debug("not released"); call->state = RXRPC_CALL_SERVER_ACCEPTING; list_move_tail(&call->accept_link, &call->socket->acceptq); } write_unlock(&call->conn->lock); read_lock(&call->state_lock); if (call->state < RXRPC_CALL_COMPLETE) set_bit(RXRPC_CALL_POST_ACCEPT, &call->events); read_unlock(&call->state_lock); } spin_unlock_bh(&call->lock); if (!test_bit(RXRPC_CALL_POST_ACCEPT, &call->events)) goto maybe_reschedule; } /* post a notification of an acceptable connection to the app */ if (test_bit(RXRPC_CALL_POST_ACCEPT, &call->events)) { _debug("post accept"); if (rxrpc_post_message(call, RXRPC_SKB_MARK_NEW_CALL, 0, false) < 0) goto no_mem; clear_bit(RXRPC_CALL_POST_ACCEPT, &call->events); goto maybe_reschedule; } /* handle incoming call acceptance */ if (test_and_clear_bit(RXRPC_CALL_ACCEPTED, &call->events)) { _debug("accepted"); ASSERTCMP(call->rx_data_post, ==, 0); call->rx_data_post = 1; read_lock_bh(&call->state_lock); if (call->state < RXRPC_CALL_COMPLETE) set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events); read_unlock_bh(&call->state_lock); } /* drain the out of sequence received packet queue into the packet Rx * queue */ if (test_and_clear_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events)) { while (call->rx_data_post == call->rx_first_oos) if (rxrpc_drain_rx_oos_queue(call) < 0) break; goto maybe_reschedule; } /* other events may have been raised since we started checking */ goto maybe_reschedule;send_ACK_with_skew: ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) - ntohl(ack.serial));send_ACK: mtu = call->conn->trans->peer->if_mtu; mtu -= call->conn->trans->peer->hdrsize; ackinfo.maxMTU = htonl(mtu); ackinfo.rwind = htonl(32); /* permit the peer to send us jumbo packets if it wants to */ ackinfo.rxMTU = htonl(5692); ackinfo.jumbo_max = htonl(4); hdr.serial = htonl(atomic_inc_return(&call->conn->serial)); _proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }", ntohl(hdr.serial), ntohs(ack.maxSkew), ntohl(ack.firstPacket), ntohl(ack.previousPacket), ntohl(ack.serial), rxrpc_acks[ack.reason], ack.nAcks); del_timer_sync(&call->ack_timer); if (ack.nAcks > 0) set_bit(RXRPC_CALL_TX_SOFT_ACK, &call->flags); goto send_message_2;send_message: _debug("send message"); hdr.serial = htonl(atomic_inc_return(&call->conn->serial)); _proto("Tx %s %%%u", rxrpc_pkts[hdr.type], ntohl(hdr.serial));send_message_2: len = iov[0].iov_len; ioc = 1; if (iov[4].iov_len) { ioc = 5; len += iov[4].iov_len; len += iov[3].iov_len; len += iov[2].iov_len; len += iov[1].iov_len; } else if (iov[3].iov_len) { ioc = 4; len += iov[3].iov_len; len += iov[2].iov_len; len += iov[1].iov_len; } else if (iov[2].iov_len) { ioc = 3; len += iov[2].iov_len; len += iov[1].iov_len; } else if (iov[1].iov_len) { ioc = 2; len += iov[1].iov_len; } ret = kernel_sendmsg(call->conn->trans->local->socket, &msg, iov, ioc, len); if (ret < 0) { _debug("sendmsg failed: %d", ret); read_lock_bh(&call->state_lock); if (call->state < RXRPC_CALL_DEAD) rxrpc_queue_call(call); read_unlock_bh(&call->state_lock); goto error; } switch (genbit) { case RXRPC_CALL_ABORT: clear_bit(genbit, &call->events); clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events); goto kill_ACKs; case RXRPC_CALL_ACK_FINAL: write_lock_bh(&call->state_lock); if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK) call->state = RXRPC_CALL_COMPLETE; write_unlock_bh(&call->state_lock); goto kill_ACKs; default: clear_bit(genbit, &call->events); switch (call->state) { case RXRPC_CALL_CLIENT_AWAIT_REPLY: case RXRPC_CALL_CLIENT_RECV_REPLY: case RXRPC_CALL_SERVER_RECV_REQUEST: case RXRPC_CALL_SERVER_ACK_REQUEST: _debug("start ACK timer"); rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, call->ackr_serial, false); default: break; } goto maybe_reschedule; }kill_ACKs: del_timer_sync(&call->ack_timer); if (test_and_clear_bit(RXRPC_CALL_ACK_FINAL, &call->events)) rxrpc_put_call(call); clear_bit(RXRPC_CALL_ACK, &call->events);maybe_reschedule: if (call->events || !skb_queue_empty(&call->rx_queue)) { read_lock_bh(&call->state_lock); if (call->state < RXRPC_CALL_DEAD) rxrpc_queue_call(call); read_unlock_bh(&call->state_lock); } /* don't leave aborted connections on the accept queue */ if (call->state >= RXRPC_CALL_COMPLETE && !list_empty(&call->accept_link)) { _debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }", call, call->events, call->flags, ntohl(call->conn->cid)); read_lock_bh(&call->state_lock); if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) rxrpc_queue_call(call); read_unlock_bh(&call->state_lock); }error: clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags); kfree(acks); /* because we don't want two CPUs both processing the work item for one * call at the same time, we use a flag to note when it's busy; however * this means there's a race between clearing the flag and setting the * work pending bit and the work item being processed again */ if (call->events && !work_pending(&call->processor)) { _debug("jumpstart %x", ntohl(call->conn->cid)); rxrpc_queue_call(call); } _leave(""); return;no_mem: _debug("out of memory"); goto maybe_reschedule;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -