📄 pseudotcp.cc
字号:
} } else { m_dup_acks = 0; // Slow start, congestion avoidance if (m_cwnd < m_ssthresh) { m_cwnd += m_mss; } else { m_cwnd += talk_base::_max(1LU, m_mss * m_mss / m_cwnd); } } // !?! A bit hacky if ((m_state == TCP_SYN_RECEIVED) && !bConnect) { m_state = TCP_ESTABLISHED; LOG(LS_INFO) << "State: TCP_ESTABLISHED"; adjustMTU(); if (m_notify) { m_notify->OnTcpOpen(this); } //notify(evOpen); } // If we make room in the send queue, notify the user // The goal it to make sure we always have at least enough data to fill the // window. We'd like to notify the app when we are halfway to that point. const uint32 kIdealRefillSize = (sizeof(m_sbuf) + sizeof(m_rbuf)) / 2; if (m_bWriteEnable && (m_slen < kIdealRefillSize)) { m_bWriteEnable = false; if (m_notify) { m_notify->OnTcpWriteable(this); } //notify(evWrite); } } else if (seg.ack == m_snd_una) { // !?! Note, tcp says don't do this... but otherwise how does a closed window become open? m_snd_wnd = seg.wnd; // Check duplicate acks if (seg.len > 0) { // it's a dup ack, but with a data payload, so don't modify m_dup_acks } else if (m_snd_una != m_snd_nxt) { m_dup_acks += 1; if (m_dup_acks == 3) { // (Fast Retransmit)#if _DEBUGMSG >= _DBG_NORMAL LOG(LS_INFO) << "enter recovery"; LOG(LS_INFO) << "recovery retransmit";#endif // _DEBUGMSG if (!transmit(m_slist.begin(), now)) { closedown(ECONNABORTED); return false; } m_recover = m_snd_nxt; uint32 nInFlight = m_snd_nxt - m_snd_una; m_ssthresh = talk_base::_max(nInFlight / 2, 2 * m_mss); //LOG(LS_INFO) << "m_ssthresh: " << m_ssthresh << " nInFlight: " << nInFlight << " m_mss: " << m_mss; m_cwnd = m_ssthresh + 3 * m_mss; } else if (m_dup_acks > 3) { m_cwnd += m_mss; } } else { m_dup_acks = 0; } } // Conditions were acks must be sent: // 1) Segment is too old (they missed an ACK) (immediately) // 2) Segment is too new (we missed a segment) (immediately) // 3) Segment has data (so we need to ACK!) (delayed) // ... so the only time we don't need to ACK, is an empty segment that points to rcv_nxt! SendFlags sflags = sfNone; if (seg.seq != m_rcv_nxt) { sflags = sfImmediateAck; // (Fast Recovery) } else if (seg.len != 0) { sflags = sfDelayedAck; }#if _DEBUGMSG >= _DBG_NORMAL if (sflags == sfImmediateAck) { if (seg.seq > m_rcv_nxt) { LOG_F(LS_INFO) << "too new"; } else if (seg.seq + seg.len <= m_rcv_nxt) { LOG_F(LS_INFO) << "too old"; } }#endif // _DEBUGMSG // Adjust the incoming segment to fit our receive buffer if (seg.seq < m_rcv_nxt) { uint32 nAdjust = m_rcv_nxt - seg.seq; if (nAdjust < seg.len) { seg.seq += nAdjust; seg.data += nAdjust; seg.len -= nAdjust; } else { seg.len = 0; } } if ((seg.seq + seg.len - m_rcv_nxt) > (sizeof(m_rbuf) - m_rlen)) { uint32 nAdjust = seg.seq + seg.len - m_rcv_nxt - (sizeof(m_rbuf) - m_rlen); if (nAdjust < seg.len) { seg.len -= nAdjust; } else { seg.len = 0; } } bool bIgnoreData = (seg.flags & FLAG_CTL) || (m_shutdown != SD_NONE); bool bNewData = false; if (seg.len > 0) { if (bIgnoreData) { if (seg.seq == m_rcv_nxt) { m_rcv_nxt += seg.len; } } else { uint32 nOffset = seg.seq - m_rcv_nxt; memcpy(m_rbuf + m_rlen + nOffset, seg.data, seg.len); if (seg.seq == m_rcv_nxt) { m_rlen += seg.len; m_rcv_nxt += seg.len; m_rcv_wnd -= seg.len; bNewData = true; RList::iterator it = m_rlist.begin(); while ((it != m_rlist.end()) && (it->seq <= m_rcv_nxt)) { if (it->seq + it->len > m_rcv_nxt) { sflags = sfImmediateAck; // (Fast Recovery) uint32 nAdjust = (it->seq + it->len) - m_rcv_nxt;#if _DEBUGMSG >= _DBG_NORMAL LOG(LS_INFO) << "Recovered " << nAdjust << " bytes (" << m_rcv_nxt << " -> " << m_rcv_nxt + nAdjust << ")";#endif // _DEBUGMSG m_rlen += nAdjust; m_rcv_nxt += nAdjust; m_rcv_wnd -= nAdjust; } it = m_rlist.erase(it); } } else {#if _DEBUGMSG >= _DBG_NORMAL LOG(LS_INFO) << "Saving " << seg.len << " bytes (" << seg.seq << " -> " << seg.seq + seg.len << ")";#endif // _DEBUGMSG RSegment rseg; rseg.seq = seg.seq; rseg.len = seg.len; RList::iterator it = m_rlist.begin(); while ((it != m_rlist.end()) && (it->seq < rseg.seq)) { ++it; } m_rlist.insert(it, rseg); } } } attemptSend(sflags); // If we have new data, notify the user if (bNewData && m_bReadEnable) { m_bReadEnable = false; if (m_notify) { m_notify->OnTcpReadable(this); } //notify(evRead); } return true;}boolPseudoTcp::transmit(const SList::iterator& seg, uint32 now) { if (seg->xmit >= ((m_state == TCP_ESTABLISHED) ? 15 : 30)) { LOG_F(LS_VERBOSE) << "too many retransmits"; return false; } uint32 nTransmit = talk_base::_min(seg->len, m_mss); while (true) { uint32 seq = seg->seq; uint8 flags = (seg->bCtrl ? FLAG_CTL : 0); const char * buffer = m_sbuf + (seg->seq - m_snd_una); IPseudoTcpNotify::WriteResult wres = this->packet(seq, flags, buffer, nTransmit); if (wres == IPseudoTcpNotify::WR_SUCCESS) break; if (wres == IPseudoTcpNotify::WR_FAIL) { LOG_F(LS_VERBOSE) << "packet failed"; return false; } ASSERT(wres == IPseudoTcpNotify::WR_TOO_LARGE); while (true) { if (PACKET_MAXIMUMS[m_msslevel + 1] == 0) { LOG_F(LS_VERBOSE) << "MTU too small"; return false; } // !?! We need to break up all outstanding and pending packets and then retransmit!?! m_mss = PACKET_MAXIMUMS[++m_msslevel] - PACKET_OVERHEAD; m_cwnd = 2 * m_mss; // I added this... haven't researched actual formula if (m_mss < nTransmit) { nTransmit = m_mss; break; } }#if _DEBUGMSG >= _DBG_NORMAL LOG(LS_INFO) << "Adjusting mss to " << m_mss << " bytes";#endif // _DEBUGMSG } if (nTransmit < seg->len) { LOG_F(LS_VERBOSE) << "mss reduced to " << m_mss; SSegment subseg(seg->seq + nTransmit, seg->len - nTransmit, seg->bCtrl); //subseg.tstamp = seg->tstamp; subseg.xmit = seg->xmit; seg->len = nTransmit; SList::iterator next = seg; m_slist.insert(++next, subseg); } if (seg->xmit == 0) { m_snd_nxt += seg->len; } seg->xmit += 1; //seg->tstamp = now; if (m_rto_base == 0) { m_rto_base = now; } return true;}voidPseudoTcp::attemptSend(SendFlags sflags) { uint32 now = Now(); if (talk_base::TimeDiff(now, m_lastsend) > static_cast<long>(m_rx_rto)) { m_cwnd = m_mss; }#if _DEBUGMSG bool bFirst = true; UNUSED(bFirst);#endif // _DEBUGMSG while (true) { uint32 cwnd = m_cwnd; if ((m_dup_acks == 1) || (m_dup_acks == 2)) { // Limited Transmit cwnd += m_dup_acks * m_mss; } uint32 nWindow = talk_base::_min(m_snd_wnd, cwnd); uint32 nInFlight = m_snd_nxt - m_snd_una; uint32 nUseable = (nInFlight < nWindow) ? (nWindow - nInFlight) : 0; uint32 nAvailable = talk_base::_min(m_slen - nInFlight, m_mss); if (nAvailable > nUseable) { if (nUseable * 4 < nWindow) { // RFC 813 - avoid SWS nAvailable = 0; } else { nAvailable = nUseable; } }#if _DEBUGMSG >= _DBG_VERBOSE if (bFirst) { bFirst = false; LOG(LS_INFO) << "[cwnd: " << m_cwnd << " nWindow: " << nWindow << " nInFlight: " << nInFlight << " nAvailable: " << nAvailable << " nQueued: " << m_slen - nInFlight << " nEmpty: " << sizeof(m_sbuf) - m_slen << " ssthresh: " << m_ssthresh << "]"; }#endif // _DEBUGMSG if (nAvailable == 0) { if (sflags == sfNone) return; // If this is an immediate ack, or the second delayed ack if ((sflags == sfImmediateAck) || m_t_ack) { packet(m_snd_nxt, 0, 0, 0); } else { m_t_ack = Now(); } return; } // Nagle algorithm if ((m_snd_nxt > m_snd_una) && (nAvailable < m_mss)) { return; } // Find the next segment to transmit SList::iterator it = m_slist.begin(); while (it->xmit > 0) { ++it; ASSERT(it != m_slist.end()); } SList::iterator seg = it; // If the segment is too large, break it into two if (seg->len > nAvailable) { SSegment subseg(seg->seq + nAvailable, seg->len - nAvailable, seg->bCtrl); seg->len = nAvailable; m_slist.insert(++it, subseg); } if (!transmit(seg, now)) { LOG_F(LS_VERBOSE) << "transmit failed"; // TODO: consider closing socket return; } sflags = sfNone; }}voidPseudoTcp::closedown(uint32 err) { m_slen = 0; LOG(LS_INFO) << "State: TCP_CLOSED"; m_state = TCP_CLOSED; if (m_notify) { m_notify->OnTcpClosed(this, err); } //notify(evClose, err);}voidPseudoTcp::adjustMTU() { // Determine our current mss level, so that we can adjust appropriately later for (m_msslevel = 0; PACKET_MAXIMUMS[m_msslevel + 1] > 0; ++m_msslevel) { if (static_cast<uint16>(PACKET_MAXIMUMS[m_msslevel]) <= m_mtu_advise) { break; } } m_mss = m_mtu_advise - PACKET_OVERHEAD; // !?! Should we reset m_largest here?#if _DEBUGMSG >= _DBG_NORMAL LOG(LS_INFO) << "Adjusting mss to " << m_mss << " bytes";#endif // _DEBUGMSG // Enforce minimums on ssthresh and cwnd m_ssthresh = talk_base::_max(m_ssthresh, 2 * m_mss); m_cwnd = talk_base::_max(m_cwnd, m_mss);}} // namespace cricket
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -