📄 core.cpp
字号:
m_iTraceSndLoss += m_pSndLossList->insert(const_cast<int32_t&>(m_iSndLastAck), losslist[i + 1]); ++ i; } else if (CSeqNo::seqcmp(losslist[i], const_cast<int32_t&>(m_iSndLastAck)) >= 0) { if (CSeqNo::seqcmp(losslist[i], m_iSndCurrSeqNo) > 0) { //seq_a must not be greater than the most recent sent seq secure = false; break; } m_iTraceSndLoss += m_pSndLossList->insert(losslist[i], losslist[i]); } } if (!secure) { //this should not happen: attack or bug m_bBroken = true; m_iBrokenCounter = 0; break; } // Wake up the waiting sender (avoiding deadlock on an infinite sleeping) m_pSndLossList->insert(const_cast<int32_t&>(m_iSndLastAck), const_cast<int32_t&>(m_iSndLastAck)); // the lost packet (retransmission) should be sent out immediately m_pSndQueue->m_pSndUList->update(this); ++ m_iRecvNAK; break; } case 4: //100 - Delay Warning // One way packet delay is increasing, so decrease the sending rate m_ullInterval = (uint64_t)ceil(m_ullInterval * 1.125); m_iLastDecSeq = m_iSndCurrSeqNo; break; case 1: //001 - Keep-alive // The only purpose of keep-alive packet is to tell that the peer is still alive // nothing needs to be done. break; case 0: //000 - Handshake if ((((CHandShake*)(ctrlpkt.m_pcData))->m_iReqType > 0) || (m_bRendezvous && (((CHandShake*)(ctrlpkt.m_pcData))->m_iReqType != -2))) { // The peer side has not received the handshake message, so it keeps querying // resend the handshake packet CHandShake initdata; initdata.m_iISN = m_iISN; initdata.m_iMSS = m_iMSS; initdata.m_iFlightFlagSize = m_iFlightFlagSize; initdata.m_iReqType = (!m_bRendezvous) ? -1 : -2; initdata.m_iID = m_SocketID; sendCtrl(0, NULL, (char *)&initdata, sizeof(CHandShake)); } break; case 5: //101 - Shutdown m_bShutdown = true; m_bClosing = true; m_bBroken = true; m_iBrokenCounter = 60; // Signal the sender and recver if they are waiting for data. releaseSynch(); CTimer::triggerEvent(); break; case 7: //111 - Msg drop request m_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq()); m_pRcvLossList->remove(*(int32_t*)ctrlpkt.m_pcData, *(int32_t*)(ctrlpkt.m_pcData + 4)); break; case 65535: //0x7FFF - reserved and user defined messages m_pCC->processCustomMsg(&ctrlpkt); // update CC parameters m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); m_dCongestionWindow = m_pCC->m_dCWndSize; break; default: break; }}int CUDT::packData(CPacket& packet, uint64_t& ts){ int payload = 0; bool probe = false; uint64_t entertime; CTimer::rdtsc(entertime); if ((0 != m_ullTargetTime) && (entertime > m_ullTargetTime)) m_ullTimeDiff += entertime - m_ullTargetTime; // Loss retransmission always has higher priority. if ((packet.m_iSeqNo = m_pSndLossList->getLostSeq()) >= 0) { // protect m_iSndLastDataAck from updating by ACK processing CGuard ackguard(m_AckLock); int offset = CSeqNo::seqoff(m_iSndLastDataAck, packet.m_iSeqNo); if (offset < 0) return 0; int msglen; payload = m_pSndBuffer->readData(&(packet.m_pcData), offset, packet.m_iMsgNo, msglen); if (-1 == payload) { int32_t seqpair[2]; seqpair[0] = packet.m_iSeqNo; seqpair[1] = CSeqNo::incseq(seqpair[0], msglen); sendCtrl(7, &packet.m_iMsgNo, seqpair, 8); // only one msg drop request is necessary m_pSndLossList->remove(seqpair[1]); return 0; } else if (0 == payload) return 0; ++ m_iTraceRetrans; } else { // If no loss, pack a new packet. // check congestion/flow window limit int cwnd = (m_iFlowWindowSize < (int)m_dCongestionWindow) ? m_iFlowWindowSize : (int)m_dCongestionWindow; if (cwnd >= CSeqNo::seqlen(const_cast<int32_t&>(m_iSndLastAck), CSeqNo::incseq(m_iSndCurrSeqNo))) { if (0 != (payload = m_pSndBuffer->readData(&(packet.m_pcData), packet.m_iMsgNo))) { m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo); m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo); packet.m_iSeqNo = m_iSndCurrSeqNo; // every 16 (0xF) packets, a packet pair is sent if (0 == (packet.m_iSeqNo & 0xF)) probe = true; } else { m_ullTargetTime = 0; m_ullTimeDiff = 0; ts = 0; return 0; } } else { m_ullTargetTime = 0; m_ullTimeDiff = 0; ts = 0; return 0; } } packet.m_iTimeStamp = int(CTimer::getTime() - m_StartTime); m_pSndTimeWindow->onPktSent(packet.m_iTimeStamp); packet.m_iID = m_PeerID; m_pCC->onPktSent(&packet); ++ m_llTraceSent; if (probe) { // sends out probing packet pair ts = entertime; probe = false; } else { #ifndef NO_BUSY_WAITING ts = entertime + m_ullInterval; #else if (m_ullTimeDiff >= m_ullInterval) { ts = entertime; m_ullTimeDiff -= m_ullInterval; } else { ts = entertime + m_ullInterval - m_ullTimeDiff; m_ullTimeDiff = 0; } #endif } m_ullTargetTime = ts; packet.m_iID = m_PeerID; packet.setLength(payload); return payload;}int CUDT::processData(CUnit* unit){ CPacket& packet = unit->m_Packet; // Just heard from the peer, reset the expiration count. m_iEXPCount = 1; m_ullEXPInt = m_ullMinEXPInt; if (CSeqNo::incseq(m_iSndCurrSeqNo) == m_iSndLastAck) { CTimer::rdtsc(m_ullNextEXPTime); if (!m_pCC->m_bUserDefinedRTO) m_ullNextEXPTime += m_ullEXPInt; else m_ullNextEXPTime += m_pCC->m_iRTO * m_ullCPUFrequency; } m_pCC->onPktReceived(&packet); ++ m_iPktCount; // update time information m_pRcvTimeWindow->onPktArrival(); // check if it is probing packet pair if (0 == (packet.m_iSeqNo & 0xF)) m_pRcvTimeWindow->probe1Arrival(); else if (1 == (packet.m_iSeqNo & 0xF)) m_pRcvTimeWindow->probe2Arrival(); ++ m_llTraceRecv; int32_t offset = CSeqNo::seqoff(m_iRcvLastAck, packet.m_iSeqNo); if ((offset < 0) || (offset >= m_pRcvBuffer->getAvailBufSize())) return -1; if (m_pRcvBuffer->addData(unit, offset) < 0) return -1; // Loss detection. if (CSeqNo::seqcmp(packet.m_iSeqNo, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0) { // If loss found, insert them to the receiver loss list m_pRcvLossList->insert(CSeqNo::incseq(m_iRcvCurrSeqNo), CSeqNo::decseq(packet.m_iSeqNo)); // pack loss list for NAK int32_t lossdata[2]; lossdata[0] = CSeqNo::incseq(m_iRcvCurrSeqNo) | 0x80000000; lossdata[1] = CSeqNo::decseq(packet.m_iSeqNo); // Generate loss report immediately. sendCtrl(3, NULL, lossdata, (CSeqNo::incseq(m_iRcvCurrSeqNo) == CSeqNo::decseq(packet.m_iSeqNo)) ? 1 : 2); m_iTraceRcvLoss += CSeqNo::seqlen(m_iRcvCurrSeqNo, packet.m_iSeqNo) - 2; } // This is not a regular fixed size packet... //an irregular sized packet usually indicates the end of a message, so send an ACK immediately if (packet.getLength() != m_iPayloadSize) CTimer::rdtsc(m_ullNextACKTime); // Update the current largest sequence number that has been received. // Or it is a retransmitted packet, remove it from receiver loss list. if (CSeqNo::seqcmp(packet.m_iSeqNo, m_iRcvCurrSeqNo) > 0) m_iRcvCurrSeqNo = packet.m_iSeqNo; else m_pRcvLossList->remove(packet.m_iSeqNo); return 0;}int CUDT::listen(sockaddr* addr, CPacket& packet){ CGuard cg(m_ConnectionLock); if (m_bClosing) return 1002; CHandShake* hs = (CHandShake *)packet.m_pcData; // SYN cookie char clienthost[NI_MAXHOST]; char clientport[NI_MAXSERV]; getnameinfo(addr, (AF_INET == m_iVersion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6), clienthost, sizeof(clienthost), clientport, sizeof(clientport), NI_NUMERICHOST|NI_NUMERICSERV); int64_t timestamp = (CTimer::getTime() - m_StartTime) / 60000000; // secret changes every one minute char cookiestr[1024]; sprintf(cookiestr, "%s:%s:%lld", clienthost, clientport, (long long int)timestamp); unsigned char cookie[16]; CMD5::compute(cookiestr, cookie); if (1 == hs->m_iReqType) { hs->m_iCookie = *(int*)cookie; packet.m_iID = hs->m_iID; m_pSndQueue->sendto(addr, packet); return 0; } else { if (hs->m_iCookie != *(int*)cookie) { timestamp --; sprintf(cookiestr, "%s:%s:%lld", clienthost, clientport, (long long int)timestamp); CMD5::compute(cookiestr, cookie); if (hs->m_iCookie != *(int*)cookie) return -1; } } int32_t id = hs->m_iID; // When a peer side connects in... if ((1 == packet.getFlag()) && (0 == packet.getType())) { if ((hs->m_iVersion != m_iVersion) || (hs->m_iType != m_iSockType) || (-1 == s_UDTUnited.newConnection(m_SocketID, addr, hs))) { // couldn't create a new connection, reject the request hs->m_iReqType = 1002; } packet.m_iID = id; m_pSndQueue->sendto(addr, packet); } return hs->m_iReqType;}void CUDT::checkTimers(){ // update CC parameters m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); m_dCongestionWindow = m_pCC->m_dCWndSize; uint64_t minint = (uint64_t)(m_ullCPUFrequency * m_pSndTimeWindow->getMinPktSndInt() * 0.9); if (m_ullInterval < minint) m_ullInterval = minint; uint64_t currtime; CTimer::rdtsc(currtime); int32_t loss = m_pRcvLossList->getFirstLostSeq(); if ((currtime > m_ullNextACKTime) || ((m_pCC->m_iACKInterval > 0) && (m_pCC->m_iACKInterval <= m_iPktCount))) { // ACK timer expired or ACK interval reached sendCtrl(2); CTimer::rdtsc(currtime); if (m_pCC->m_iACKPeriod > 0) m_ullNextACKTime = currtime + m_pCC->m_iACKPeriod * m_ullCPUFrequency; else m_ullNextACKTime = currtime + m_ullACKInt; m_iPktCount = 0; m_iLightACKCount = 1; } else if (m_iSelfClockInterval * m_iLightACKCount <= m_iPktCount) { //send a "light" ACK sendCtrl(2, NULL, NULL, 4); ++ m_iLightACKCount; } if ((loss >= 0) && (currtime > m_ullNextNAKTime)) { // NAK timer expired, and there is loss to be reported. sendCtrl(3); CTimer::rdtsc(currtime); m_ullNextNAKTime = currtime + m_ullNAKInt; } if (currtime > m_ullNextEXPTime) { // Haven't receive any information from the peer, is it dead?! // timeout: at least 16 expirations and must be greater than 3 seconds and be less than 30 seconds if (((m_iEXPCount > 16) && (m_iEXPCount * ((m_iEXPCount - 1) * (m_iRTT + 4 * m_iRTTVar) / 2 + m_iSYNInterval) > 3000000)) || (m_iEXPCount > 30) || (m_iEXPCount * ((m_iEXPCount - 1) * (m_iRTT + 4 * m_iRTTVar) / 2 + m_iSYNInterval) > 30000000)) { // // Connection is broken. // UDT does not signal any information about this instead of to stop quietly. // Apllication will detect this when it calls any UDT methods next time. // m_bClosing = true; m_bBroken = true; m_iBrokenCounter = 30; // update snd U list to remove this socket m_pSndQueue->m_pSndUList->update(this); releaseSynch(); CTimer::triggerEvent(); return; } // sender: Insert all the packets sent after last received acknowledgement into the sender loss list. // recver: Send out a keep-alive packet if (CSeqNo::incseq(m_iSndCurrSeqNo) != m_iSndLastAck) { int32_t csn = m_iSndCurrSeqNo; m_pSndLossList->insert(const_cast<int32_t&>(m_iSndLastAck), csn); } else sendCtrl(1); if (m_pSndBuffer->getCurrBufSize() > 0) { // immediately restart transmission m_pSndQueue->m_pSndUList->update(this); } ++ m_iEXPCount; m_ullEXPInt = (m_iEXPCount * (m_iRTT + 4 * m_iRTTVar) + m_iSYNInterval) * m_ullCPUFrequency; if (m_ullEXPInt < m_iEXPCount * 100000 * m_ullCPUFrequency) m_ullEXPInt = m_iEXPCount * 100000 * m_ullCPUFrequency; CTimer::rdtsc(m_ullNextEXPTime); m_ullNextEXPTime += m_ullEXPInt; m_pCC->onTimeout(); // update CC parameters m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); m_dCongestionWindow = m_pCC->m_dCWndSize; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -