📄 core.cpp
字号:
pthread_mutex_init(&m_RecvDataLock, NULL); pthread_cond_init(&m_RecvDataCond, NULL); pthread_mutex_init(&m_SendLock, NULL); pthread_mutex_init(&m_RecvLock, NULL); pthread_mutex_init(&m_AckLock, NULL); pthread_mutex_init(&m_ConnectionLock, NULL); #else m_SendBlockLock = CreateMutex(NULL, false, NULL); m_SendBlockCond = CreateEvent(NULL, false, false, NULL); m_RecvDataLock = CreateMutex(NULL, false, NULL); m_RecvDataCond = CreateEvent(NULL, false, false, NULL); m_SendLock = CreateMutex(NULL, false, NULL); m_RecvLock = CreateMutex(NULL, false, NULL); m_AckLock = CreateMutex(NULL, false, NULL); m_ConnectionLock = CreateMutex(NULL, false, NULL); #endif}void CUDT::destroySynch(){ #ifndef WIN32 pthread_mutex_destroy(&m_SendBlockLock); pthread_cond_destroy(&m_SendBlockCond); pthread_mutex_destroy(&m_RecvDataLock); pthread_cond_destroy(&m_RecvDataCond); pthread_mutex_destroy(&m_SendLock); pthread_mutex_destroy(&m_RecvLock); pthread_mutex_destroy(&m_AckLock); pthread_mutex_destroy(&m_ConnectionLock); #else CloseHandle(m_SendBlockLock); CloseHandle(m_SendBlockCond); CloseHandle(m_RecvDataLock); CloseHandle(m_RecvDataCond); CloseHandle(m_SendLock); CloseHandle(m_RecvLock); CloseHandle(m_AckLock); CloseHandle(m_ConnectionLock); #endif}void CUDT::releaseSynch(){ #ifndef WIN32 // wake up user calls pthread_mutex_lock(&m_SendBlockLock); pthread_cond_signal(&m_SendBlockCond); pthread_mutex_unlock(&m_SendBlockLock); pthread_mutex_lock(&m_SendLock); pthread_mutex_unlock(&m_SendLock); pthread_mutex_lock(&m_RecvDataLock); pthread_cond_signal(&m_RecvDataCond); pthread_mutex_unlock(&m_RecvDataLock); pthread_mutex_lock(&m_RecvLock); pthread_mutex_unlock(&m_RecvLock); #else SetEvent(m_SendBlockCond); WaitForSingleObject(m_SendLock, INFINITE); ReleaseMutex(m_SendLock); SetEvent(m_RecvDataCond); WaitForSingleObject(m_RecvLock, INFINITE); ReleaseMutex(m_RecvLock); #endif}void CUDT::sendCtrl(const int& pkttype, void* lparam, void* rparam, const int& size){ CPacket ctrlpkt; switch (pkttype) { case 2: //010 - Acknowledgement { int32_t ack; // If there is no loss, the ACK is the current largest sequence number plus 1; // Otherwise it is the smallest sequence number in the receiver loss list. if (0 == m_pRcvLossList->getLossLength()) ack = CSeqNo::incseq(m_iRcvCurrSeqNo); else ack = m_pRcvLossList->getFirstLostSeq(); if (ack == m_iRcvLastAckAck) break; // send out a lite ACK // to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number if (4 == size) { ctrlpkt.pack(2, NULL, &ack, size); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; } uint64_t currtime; CTimer::rdtsc(currtime); // There are new received packets to acknowledge, update related information. if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0) { int acksize = CSeqNo::seqoff(m_iRcvLastAck, ack); m_iRcvLastAck = ack; m_pRcvBuffer->ackData(acksize); // signal a waiting "recv" call if there is any data available #ifndef WIN32 pthread_mutex_lock(&m_RecvDataLock); if (m_bSynRecving) pthread_cond_signal(&m_RecvDataCond); pthread_mutex_unlock(&m_RecvDataLock); #else if (m_bSynRecving) SetEvent(m_RecvDataCond); #endif } else if (ack == m_iRcvLastAck) { if ((currtime - m_ullLastAckTime) < ((m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency)) break; } else break; // Send out the ACK only if has not been received by the sender before if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0) { int32_t data[6]; m_iAckSeqNo = CAckNo::incack(m_iAckSeqNo); data[0] = m_iRcvLastAck; data[1] = m_iRTT; data[2] = m_iRTTVar; data[3] = m_pRcvBuffer->getAvailBufSize(); // a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock if (data[3] < 2) data[3] = 2; if (currtime - m_ullLastAckTime > m_ullSYNInt) { data[4] = m_pRcvTimeWindow->getPktRcvSpeed(); data[5] = m_pRcvTimeWindow->getBandwidth(); ctrlpkt.pack(2, &m_iAckSeqNo, data, 24); CTimer::rdtsc(m_ullLastAckTime); } else { ctrlpkt.pack(2, &m_iAckSeqNo, data, 16); } ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); m_pACKWindow->store(m_iAckSeqNo, m_iRcvLastAck); ++ m_iSentACK; } break; } case 6: //110 - Acknowledgement of Acknowledgement ctrlpkt.pack(6, lparam); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 3: //011 - Loss Report if (NULL != rparam) { if (1 == size) { // only 1 loss packet ctrlpkt.pack(3, NULL, (int32_t *)rparam + 1, 4); } else { // more than 1 loss packets ctrlpkt.pack(3, NULL, rparam, 8); } ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); ++ m_iSentNAK; } else if (m_pRcvLossList->getLossLength() > 0) { // this is periodically NAK report // read loss list from the local receiver loss list int32_t* data = new int32_t[m_iPayloadSize / 4]; int losslen; m_pRcvLossList->getLossArray(data, losslen, m_iPayloadSize / 4, m_iRTT + 4 * m_iRTTVar); if (0 < losslen) { ctrlpkt.pack(3, NULL, data, losslen * 4); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); ++ m_iSentNAK; } delete [] data; } break; case 4: //100 - Congestion Warning ctrlpkt.pack(4); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); CTimer::rdtsc(m_ullLastWarningTime); break; case 1: //001 - Keep-alive ctrlpkt.pack(1); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 0: //000 - Handshake ctrlpkt.pack(0, NULL, rparam, sizeof(CHandShake)); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 5: //101 - Shutdown ctrlpkt.pack(5); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 7: //111 - Msg drop request ctrlpkt.pack(7, lparam, rparam, 8); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 65535: //0x7FFF - Resevered for future use break; default: break; }}void CUDT::processCtrl(CPacket& ctrlpkt){ // Just heard from the peer, reset the expiration count. m_iEXPCount = 1; if ((CSeqNo::incseq(m_iSndCurrSeqNo) == m_iSndLastAck) || (2 == ctrlpkt.getType()) || (3 == ctrlpkt.getType())) { m_ullEXPInt = m_ullMinEXPInt; CTimer::rdtsc(m_ullNextEXPTime); m_ullNextEXPTime += m_ullEXPInt; } switch (ctrlpkt.getType()) { case 2: //010 - Acknowledgement { int32_t ack; // process a lite ACK if (4 == ctrlpkt.getLength()) { ack = *(int32_t *)ctrlpkt.m_pcData; if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0) { m_iFlowWindowSize -= CSeqNo::seqoff(const_cast<int32_t&>(m_iSndLastAck), ack); m_iSndLastAck = ack; } break; } // read ACK seq. no. ack = ctrlpkt.getAckSeqNo(); // send ACK acknowledgement // ACK2 can be much less than ACK uint64_t currtime = CTimer::getTime(); if ((currtime - m_ullSndLastAck2Time > (uint64_t)m_iSYNInterval) || (ack == m_iSndLastAck2)) { sendCtrl(6, &ack); m_iSndLastAck2 = ack; m_ullSndLastAck2Time = currtime; } // Got data ACK ack = *(int32_t *)ctrlpkt.m_pcData; // check the validation of the ack if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0) { //this should not happen: attack or bug m_bBroken = true; m_iBrokenCounter = 0; break; } if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0) { // Update Flow Window Size, must update before and together with m_iSndLastAck m_iFlowWindowSize = *((int32_t *)ctrlpkt.m_pcData + 3); m_iSndLastAck = ack; } // protect packet retransmission CGuard::enterCS(m_AckLock); int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack); if (offset <= 0) { // discard it if it is a repeated ACK CGuard::leaveCS(m_AckLock); break; } // acknowledge the sending buffer m_pSndBuffer->ackData(offset); // update sending variables m_iSndLastDataAck = ack; m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck)); CGuard::leaveCS(m_AckLock); #ifndef WIN32 pthread_mutex_lock(&m_SendBlockLock); if (m_bSynSending) pthread_cond_signal(&m_SendBlockCond); pthread_mutex_unlock(&m_SendBlockLock); #else if (m_bSynSending) SetEvent(m_SendBlockCond); #endif // insert this socket to snd list if it is not on the list yet m_pSndQueue->m_pSndUList->update(this, false); // Update RTT //m_iRTT = *((int32_t *)ctrlpkt.m_pcData + 1); //m_iRTTVar = *((int32_t *)ctrlpkt.m_pcData + 2); int rtt = *((int32_t *)ctrlpkt.m_pcData + 1); m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2; m_iRTT = (m_iRTT * 7 + rtt) >> 3; m_pCC->setRTT(m_iRTT); m_ullMinEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt; if (m_ullMinEXPInt < 100000 * m_ullCPUFrequency) m_ullMinEXPInt = 100000 * m_ullCPUFrequency; if (ctrlpkt.getLength() > 16) { // Update Estimated Bandwidth and packet delivery rate if (*((int32_t *)ctrlpkt.m_pcData + 4) > 0) m_iDeliveryRate = (m_iDeliveryRate * 7 + *((int32_t *)ctrlpkt.m_pcData + 4)) >> 3; if (*((int32_t *)ctrlpkt.m_pcData + 5) > 0) m_iBandwidth = (m_iBandwidth * 7 + *((int32_t *)ctrlpkt.m_pcData + 5)) >> 3; m_pCC->setRcvRate(m_iDeliveryRate); m_pCC->setBandwidth(m_iBandwidth); } m_pCC->onACK(ack); // update CC parameters m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); m_dCongestionWindow = m_pCC->m_dCWndSize; ++ m_iRecvACK; break; } case 6: //110 - Acknowledgement of Acknowledgement { int32_t ack; int rtt = -1; // update RTT rtt = m_pACKWindow->acknowledge(ctrlpkt.getAckSeqNo(), ack); if (rtt <= 0) break; //if increasing delay detected... // sendCtrl(4); // RTT EWMA m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2; m_iRTT = (m_iRTT * 7 + rtt) >> 3; m_pCC->setRTT(m_iRTT); m_ullMinEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt; if (m_ullMinEXPInt < 100000 * m_ullCPUFrequency) m_ullMinEXPInt = 100000 * m_ullCPUFrequency; // update last ACK that has been received by the sender if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0) m_iRcvLastAckAck = ack; break; } case 3: //011 - Loss Report { int32_t* losslist = (int32_t *)(ctrlpkt.m_pcData); m_pCC->onLoss(losslist, ctrlpkt.getLength()); // update CC parameters m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); m_dCongestionWindow = m_pCC->m_dCWndSize; bool secure = true; // decode loss list message and insert loss into the sender loss list for (int i = 0, n = (int)(ctrlpkt.getLength() / 4); i < n; ++ i) { if (0 != (losslist[i] & 0x80000000)) { if ((CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, losslist[i + 1]) > 0) || (CSeqNo::seqcmp(losslist[i + 1], m_iSndCurrSeqNo) > 0)) { // seq_a must not be greater than seq_b; seq_b must not be greater than the most recent sent seq secure = false; break; } if (CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, const_cast<int32_t&>(m_iSndLastAck)) >= 0) m_iTraceSndLoss += m_pSndLossList->insert(losslist[i] & 0x7FFFFFFF, losslist[i + 1]); else if (CSeqNo::seqcmp(losslist[i + 1], const_cast<int32_t&>(m_iSndLastAck)) >= 0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -