📄 pseudotcp.cc
字号:
packet(m_snd_nxt, 0, 0, 0); }#endif // PSEUDO_KEEPALIVE}boolPseudoTcp::NotifyPacket(const char * buffer, size_t len) { if (len > MAX_PACKET) { LOG_F(WARNING) << "packet too large"; return false; } return parse(reinterpret_cast<const uint8 *>(buffer), uint32(len));}boolPseudoTcp::GetNextClock(uint32 now, long& timeout) { return clock_check(now, timeout);}// // IPStream Implementation//intPseudoTcp::Recv(char * buffer, size_t len) { if (m_state != TCP_ESTABLISHED) { m_error = ENOTCONN; return SOCKET_ERROR; } if (m_rlen == 0) { m_bReadEnable = true; m_error = EWOULDBLOCK; return SOCKET_ERROR; } uint32 read = talk_base::_min(uint32(len), m_rlen); memcpy(buffer, m_rbuf, read); m_rlen -= read; // !?! until we create a circular buffer, we need to move all of the rest of the buffer up! memmove(m_rbuf, m_rbuf + read, sizeof(m_rbuf) - read/*m_rlen*/); if ((sizeof(m_rbuf) - m_rlen - m_rcv_wnd) >= talk_base::_min<uint32>(sizeof(m_rbuf) / 2, m_mss)) { bool bWasClosed = (m_rcv_wnd == 0); // !?! Not sure about this was closed business m_rcv_wnd = sizeof(m_rbuf) - m_rlen; if (bWasClosed) { attemptSend(sfImmediateAck); } } return read;}intPseudoTcp::Send(const char * buffer, size_t len) { if (m_state != TCP_ESTABLISHED) { m_error = ENOTCONN; return SOCKET_ERROR; } if (m_slen == sizeof(m_sbuf)) { m_bWriteEnable = true; m_error = EWOULDBLOCK; return SOCKET_ERROR; } int written = queue(buffer, uint32(len), false); attemptSend(); return written;}voidPseudoTcp::Close(bool force) { LOG_F(LS_VERBOSE) << "(" << (force ? "true" : "false") << ")"; m_shutdown = force ? SD_FORCEFUL : SD_GRACEFUL;}int PseudoTcp::GetError() { return m_error;}//// Internal Implementation//uint32PseudoTcp::queue(const char * data, uint32 len, bool bCtrl) { if (len > sizeof(m_sbuf) - m_slen) { ASSERT(!bCtrl); len = sizeof(m_sbuf) - m_slen; } // We can concatenate data if the last segment is the same type // (control v. regular data), and has not been transmitted yet if (!m_slist.empty() && (m_slist.back().bCtrl == bCtrl) && (m_slist.back().xmit == 0)) { m_slist.back().len += len; } else { SSegment sseg(m_snd_una + m_slen, len, bCtrl); m_slist.push_back(sseg); } memcpy(m_sbuf + m_slen, data, len); m_slen += len; //LOG(LS_INFO) << "PseudoTcp::queue - m_slen = " << m_slen; return len;}IPseudoTcpNotify::WriteResultPseudoTcp::packet(uint32 seq, uint8 flags, const char * data, uint32 len) { ASSERT(HEADER_SIZE + len <= MAX_PACKET); uint32 now = Now(); uint8 buffer[MAX_PACKET]; long_to_bytes(m_conv, buffer); long_to_bytes(seq, buffer + 4); long_to_bytes(m_rcv_nxt, buffer + 8); buffer[12] = 0; buffer[13] = flags; short_to_bytes(uint16(m_rcv_wnd), buffer + 14); // Timestamp computations long_to_bytes(now, buffer + 16); long_to_bytes(m_ts_recent, buffer + 20); m_ts_lastack = m_rcv_nxt; memcpy(buffer + HEADER_SIZE, data, len);#if _DEBUGMSG >= _DBG_VERBOSE LOG(LS_INFO) << "<-- <CONV=" << m_conv << "><FLG=" << static_cast<unsigned>(flags) << "><SEQ=" << seq << ":" << seq + len << "><ACK=" << m_rcv_nxt << "><WND=" << m_rcv_wnd << "><TS=" << (now % 10000) << "><TSR=" << (m_ts_recent % 10000) << "><LEN=" << len << ">";#endif // _DEBUGMSG IPseudoTcpNotify::WriteResult wres = m_notify->TcpWritePacket(this, reinterpret_cast<char *>(buffer), len + HEADER_SIZE); // Note: When data is NULL, this is an ACK packet. We don't read the return value for those, // and thus we won't retry. So go ahead and treat the packet as a success (basically simulate // as if it were dropped), which will prevent our timers from being messed up. if ((wres != IPseudoTcpNotify::WR_SUCCESS) && (NULL != data)) return wres; m_t_ack = 0; if (len > 0) { m_lastsend = now; } m_lasttraffic = now; m_bOutgoing = true; return IPseudoTcpNotify::WR_SUCCESS;}boolPseudoTcp::parse(const uint8 * buffer, uint32 size) { if (size < 12) return false; Segment seg; seg.conv = bytes_to_long(buffer); seg.seq = bytes_to_long(buffer + 4); seg.ack = bytes_to_long(buffer + 8); seg.flags = buffer[13]; seg.wnd = bytes_to_short(buffer + 14); seg.tsval = bytes_to_long(buffer + 16); seg.tsecr = bytes_to_long(buffer + 20); seg.data = reinterpret_cast<const char *>(buffer) + HEADER_SIZE; seg.len = size - HEADER_SIZE;#if _DEBUGMSG >= _DBG_VERBOSE LOG(LS_INFO) << "--> <CONV=" << seg.conv << "><FLG=" << static_cast<unsigned>(seg.flags) << "><SEQ=" << seg.seq << ":" << seg.seq + seg.len << "><ACK=" << seg.ack << "><WND=" << seg.wnd << "><TS=" << (seg.tsval % 10000) << "><TSR=" << (seg.tsecr % 10000) << "><LEN=" << seg.len << ">";#endif // _DEBUGMSG return process(seg);}boolPseudoTcp::clock_check(uint32 now, long& nTimeout) { if (m_shutdown == SD_FORCEFUL) return false; if ((m_shutdown == SD_GRACEFUL) && ((m_state != TCP_ESTABLISHED) || ((m_slen == 0) && (m_t_ack == 0)))) { return false; } if (m_state == TCP_CLOSED) { nTimeout = CLOSED_TIMEOUT; return true; } nTimeout = DEFAULT_TIMEOUT; if (m_t_ack) { nTimeout = talk_base::_min(nTimeout, talk_base::TimeDiff(m_t_ack + ACK_DELAY, now)); } if (m_rto_base) { nTimeout = talk_base::_min(nTimeout, talk_base::TimeDiff(m_rto_base + m_rx_rto, now)); } if (m_snd_wnd == 0) { nTimeout = talk_base::_min(nTimeout, talk_base::TimeDiff(m_lastsend + m_rx_rto, now)); }#if PSEUDO_KEEPALIVE if (m_state == TCP_ESTABLISHED) { nTimeout = talk_base::_min(nTimeout, talk_base::TimeDiff(m_lasttraffic + (m_bOutgoing ? IDLE_PING * 3/2 : IDLE_PING), now)); }#endif // PSEUDO_KEEPALIVE return true;}boolPseudoTcp::process(Segment& seg) { // If this is the wrong conversation, send a reset!?! (with the correct conversation?) if (seg.conv != m_conv) { //if ((seg.flags & FLAG_RST) == 0) { // packet(tcb, seg.ack, 0, FLAG_RST, 0, 0); //} LOG_F(LS_ERROR) << "wrong conversation"; return false; } uint32 now = Now(); m_lasttraffic = m_lastrecv = now; m_bOutgoing = false; if (m_state == TCP_CLOSED) { // !?! send reset? LOG_F(LS_ERROR) << "closed"; return false; } // Check if this is a reset segment if (seg.flags & FLAG_RST) { closedown(ECONNRESET); return false; } // Check for control data bool bConnect = false; if (seg.flags & FLAG_CTL) { if (seg.len == 0) { LOG_F(LS_ERROR) << "Missing control code"; return false; } else if (seg.data[0] == CTL_CONNECT) { bConnect = true; if (m_state == TCP_LISTEN) { m_state = TCP_SYN_RECEIVED; LOG(LS_INFO) << "State: TCP_SYN_RECEIVED"; //m_notify->associate(addr); char buffer[1]; buffer[0] = CTL_CONNECT; queue(buffer, 1, true); } else if (m_state == TCP_SYN_SENT) { m_state = TCP_ESTABLISHED; LOG(LS_INFO) << "State: TCP_ESTABLISHED"; adjustMTU(); if (m_notify) { m_notify->OnTcpOpen(this); } //notify(evOpen); } } else { LOG_F(LS_WARNING) << "Unknown control code: " << seg.data[0]; return false; } } // Update timestamp if ((seg.seq <= m_ts_lastack) && (m_ts_lastack < seg.seq + seg.len)) { m_ts_recent = seg.tsval; } // Check if this is a valuable ack if ((seg.ack > m_snd_una) && (seg.ack <= m_snd_nxt)) { // Calculate round-trip time if (seg.tsecr) { long rtt = talk_base::TimeDiff(now, seg.tsecr); if (rtt >= 0) { if (m_rx_srtt == 0) { m_rx_srtt = rtt; m_rx_rttvar = rtt / 2; } else { m_rx_rttvar = (3 * m_rx_rttvar + abs(long(rtt - m_rx_srtt))) / 4; m_rx_srtt = (7 * m_rx_srtt + rtt) / 8; } m_rx_rto = bound(MIN_RTO, m_rx_srtt + talk_base::_max(1LU, 4 * m_rx_rttvar), MAX_RTO);#if _DEBUGMSG >= _DBG_VERBOSE LOG(LS_INFO) << "rtt: " << rtt << " srtt: " << m_rx_srtt << " rto: " << m_rx_rto;#endif // _DEBUGMSG } else { ASSERT(false); } } m_snd_wnd = seg.wnd; uint32 nAcked = seg.ack - m_snd_una; m_snd_una = seg.ack; m_rto_base = (m_snd_una == m_snd_nxt) ? 0 : now; m_slen -= nAcked; memmove(m_sbuf, m_sbuf + nAcked, m_slen); //LOG(LS_INFO) << "PseudoTcp::process - m_slen = " << m_slen; for (uint32 nFree = nAcked; nFree > 0; ) { ASSERT(!m_slist.empty()); if (nFree < m_slist.front().len) { m_slist.front().len -= nFree; nFree = 0; } else { if (m_slist.front().len > m_largest) { m_largest = m_slist.front().len; } nFree -= m_slist.front().len; m_slist.pop_front(); } } if (m_dup_acks >= 3) { if (m_snd_una >= m_recover) { // NewReno uint32 nInFlight = m_snd_nxt - m_snd_una; m_cwnd = talk_base::_min(m_ssthresh, nInFlight + m_mss); // (Fast Retransmit) #if _DEBUGMSG >= _DBG_NORMAL LOG(LS_INFO) << "exit recovery";#endif // _DEBUGMSG m_dup_acks = 0; } else {#if _DEBUGMSG >= _DBG_NORMAL LOG(LS_INFO) << "recovery retransmit";#endif // _DEBUGMSG if (!transmit(m_slist.begin(), now)) { closedown(ECONNABORTED); return false; } m_cwnd += m_mss - talk_base::_min(nAcked, m_cwnd);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -