📄 core.cpp.svn-base
字号:
perf->pktRetransTotal = m_iRetransTotal; perf->pktSentACKTotal = m_iSentACKTotal; perf->pktRecvACKTotal = m_iRecvACKTotal; perf->pktSentNAKTotal = m_iSentNAKTotal; perf->pktRecvNAKTotal = m_iRecvNAKTotal; perf->pktSent = m_llTraceSent; perf->pktRecv = m_llTraceRecv; perf->pktSndLoss = m_iTraceSndLoss; perf->pktRcvLoss = m_iTraceRcvLoss; perf->pktRetrans = m_iTraceRetrans; perf->pktSentACK = m_iSentACK; perf->pktRecvACK = m_iRecvACK; perf->pktSentNAK = m_iSentNAK; perf->pktRecvNAK = m_iRecvNAK; double interval = double(currtime - m_LastSampleTime); perf->mbpsSendRate = double(m_llTraceSent) * m_iPayloadSize * 8.0 / interval; perf->mbpsRecvRate = double(m_llTraceRecv) * m_iPayloadSize * 8.0 / interval; perf->usPktSndPeriod = m_ullInterval / double(m_ullCPUFrequency); perf->pktFlowWindow = m_iFlowWindowSize; perf->pktCongestionWindow = (int)m_dCongestionWindow; perf->pktFlightSize = CSeqNo::seqlen(const_cast<int32_t&>(m_iSndLastAck), m_iSndCurrSeqNo); perf->msRTT = m_iRTT/1000.0; perf->mbpsBandwidth = m_iBandwidth * m_iPayloadSize * 8.0 / 1000000.0; #ifndef WIN32 if (0 == pthread_mutex_trylock(&m_ConnectionLock)) #else if (WAIT_OBJECT_0 == WaitForSingleObject(m_ConnectionLock, 0)) #endif { perf->byteAvailSndBuf = (NULL == m_pSndBuffer) ? 0 : m_iSndBufSize - m_pSndBuffer->getCurrBufSize(); perf->byteAvailRcvBuf = (NULL == m_pRcvBuffer) ? 0 : m_pRcvBuffer->getAvailBufSize(); #ifndef WIN32 pthread_mutex_unlock(&m_ConnectionLock); #else ReleaseMutex(m_ConnectionLock); #endif } else { perf->byteAvailSndBuf = 0; perf->byteAvailRcvBuf = 0; } if (clear) { m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceSndLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0; m_LastSampleTime = currtime; } return CUDTException();}void CUDT::initSynch(){ #ifndef WIN32 pthread_mutex_init(&m_SendBlockLock, NULL); pthread_cond_init(&m_SendBlockCond, NULL); pthread_mutex_init(&m_RecvDataLock, NULL); pthread_cond_init(&m_RecvDataCond, NULL); pthread_mutex_init(&m_SendLock, NULL); pthread_mutex_init(&m_RecvLock, NULL); pthread_mutex_init(&m_AckLock, NULL); pthread_mutex_init(&m_ConnectionLock, NULL); #else m_SendBlockLock = CreateMutex(NULL, false, NULL); m_SendBlockCond = CreateEvent(NULL, false, false, NULL); m_RecvDataLock = CreateMutex(NULL, false, NULL); m_RecvDataCond = CreateEvent(NULL, false, false, NULL); m_SendLock = CreateMutex(NULL, false, NULL); m_RecvLock = CreateMutex(NULL, false, NULL); m_AckLock = CreateMutex(NULL, false, NULL); m_ConnectionLock = CreateMutex(NULL, false, NULL); #endif}void CUDT::destroySynch(){ #ifndef WIN32 pthread_mutex_destroy(&m_SendBlockLock); pthread_cond_destroy(&m_SendBlockCond); pthread_mutex_destroy(&m_RecvDataLock); pthread_cond_destroy(&m_RecvDataCond); pthread_mutex_destroy(&m_SendLock); pthread_mutex_destroy(&m_RecvLock); pthread_mutex_destroy(&m_AckLock); pthread_mutex_destroy(&m_ConnectionLock); #else CloseHandle(m_SendBlockLock); CloseHandle(m_SendBlockCond); CloseHandle(m_RecvDataLock); CloseHandle(m_RecvDataCond); CloseHandle(m_SendLock); CloseHandle(m_RecvLock); CloseHandle(m_AckLock); CloseHandle(m_ConnectionLock); #endif}void CUDT::releaseSynch(){ #ifndef WIN32 // wake up user calls pthread_mutex_lock(&m_SendBlockLock); pthread_cond_signal(&m_SendBlockCond); pthread_mutex_unlock(&m_SendBlockLock); pthread_mutex_lock(&m_SendLock); pthread_mutex_unlock(&m_SendLock); pthread_mutex_lock(&m_RecvDataLock); pthread_cond_signal(&m_RecvDataCond); pthread_mutex_unlock(&m_RecvDataLock); pthread_mutex_lock(&m_RecvLock); pthread_mutex_unlock(&m_RecvLock); #else SetEvent(m_SendBlockCond); WaitForSingleObject(m_SendLock, INFINITE); ReleaseMutex(m_SendLock); SetEvent(m_RecvDataCond); WaitForSingleObject(m_RecvLock, INFINITE); ReleaseMutex(m_RecvLock); #endif}void CUDT::sendCtrl(const int pkttype, void* lparam, void* rparam, const int size){ CPacket ctrlpkt; switch (pkttype) { case 2: //010 - Acknowledgement { int32_t ack; // If there is no loss, the ACK is the current largest sequence number plus 1; // Otherwise it is the smallest sequence number in the receiver loss list. if (0 == m_pRcvLossList->getLossLength()) ack = CSeqNo::incseq(m_iRcvCurrSeqNo); else ack = m_pRcvLossList->getFirstLostSeq(); if (ack == m_iRcvLastAckAck) break; // send out a lite ACK // to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number if (4 == size) { ctrlpkt.pack(2, NULL, &ack, size); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; } uint64_t currtime; CTimer::rdtsc(currtime); // There are new received packets to acknowledge, update related information. if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0) { int acksize = CSeqNo::seqoff(m_iRcvLastAck, ack); m_iRcvLastAck = ack; m_pRcvBuffer->ackData(acksize); // signal a waiting "recv" call if there is any data available #ifndef WIN32 pthread_mutex_lock(&m_RecvDataLock); if (m_bSynRecving) pthread_cond_signal(&m_RecvDataCond); pthread_mutex_unlock(&m_RecvDataLock); #else if (m_bSynRecving) SetEvent(m_RecvDataCond); #endif } else if (ack == m_iRcvLastAck) { if ((currtime - m_ullLastAckTime) < ((m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency)) break; } else break; // Send out the ACK only if has not been received by the sender before if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0) { int32_t data[6]; m_iAckSeqNo = CAckNo::incack(m_iAckSeqNo); data[0] = m_iRcvLastAck; data[1] = m_iRTT; data[2] = m_iRTTVar; data[3] = m_pRcvBuffer->getAvailBufSize(); // a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock if (data[3] < 2) data[3] = 2; if (currtime - m_ullLastAckTime > m_ullSYNInt) { data[4] = m_pRcvTimeWindow->getPktRcvSpeed(); data[5] = m_pRcvTimeWindow->getBandwidth(); ctrlpkt.pack(2, &m_iAckSeqNo, data, 24); CTimer::rdtsc(m_ullLastAckTime); } else { ctrlpkt.pack(2, &m_iAckSeqNo, data, 16); } ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); m_pACKWindow->store(m_iAckSeqNo, m_iRcvLastAck); ++ m_iSentACK; } break; } case 6: //110 - Acknowledgement of Acknowledgement ctrlpkt.pack(6, lparam); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 3: //011 - Loss Report if (NULL != rparam) { if (1 == size) { // only 1 loss packet ctrlpkt.pack(3, NULL, (int32_t *)rparam + 1, 4); } else { // more than 1 loss packets ctrlpkt.pack(3, NULL, rparam, 8); } ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); ++ m_iSentNAK; } else if (m_pRcvLossList->getLossLength() > 0) { // this is periodically NAK report // read loss list from the local receiver loss list int32_t* data = new (nothrow) int32_t[m_iPayloadSize / 4]; if (data) // !nash! if can't alloc data NAK report will be skipped { int losslen; m_pRcvLossList->getLossArray(data, losslen, m_iPayloadSize / 4, m_iRTT + 4 * m_iRTTVar); if (0 < losslen) { ctrlpkt.pack(3, NULL, data, losslen * 4); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); ++ m_iSentNAK; } delete [] data; } } break; case 4: //100 - Congestion Warning ctrlpkt.pack(4); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); CTimer::rdtsc(m_ullLastWarningTime); break; case 1: //001 - Keep-alive ctrlpkt.pack(1); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 0: //000 - Handshake ctrlpkt.pack(0, NULL, rparam, sizeof(CHandShake)); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 5: //101 - Shutdown ctrlpkt.pack(5); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 7: //111 - Msg drop request ctrlpkt.pack(7, lparam, rparam, 8); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 65535: //0x7FFF - Resevered for future use break; default: break; }}void CUDT::processCtrl(CPacket& ctrlpkt){ // Just heard from the peer, reset the expiration count. m_iEXPCount = 1; if ((CSeqNo::incseq(m_iSndCurrSeqNo) == m_iSndLastAck) || (2 == ctrlpkt.getType()) || (3 == ctrlpkt.getType())) { m_ullEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt; if (m_ullEXPInt < 300000 * m_ullCPUFrequency) m_ullEXPInt = 300000 * m_ullCPUFrequency; CTimer::rdtsc(m_ullNextEXPTime); m_ullNextEXPTime += m_ullEXPInt; } switch (ctrlpkt.getType()) { case 2: //010 - Acknowledgement { int32_t ack; // process a lite ACK if (4 == ctrlpkt.getLength()) { ack = *(int32_t *)ctrlpkt.m_pcData; if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0) { m_iFlowWindowSize -= CSeqNo::seqoff(const_cast<int32_t&>(m_iSndLastAck), ack); m_iSndLastAck = ack; } break; } // read ACK seq. no. ack = ctrlpkt.getAckSeqNo(); // send ACK acknowledgement sendCtrl(6, &ack); // Got data ACK ack = *(int32_t *)ctrlpkt.m_pcData; // check the validation of the ack if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0) { //this should not happen: attack or bug m_bBroken = true; break; } // check the validation of the ack if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0) { //this should not happen: attack or bug m_bBroken = true; break; } if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0) { // Update Flow Window Size, must update before and together with m_iSndLastAck m_iFlowWindowSize = *((int32_t *)ctrlpkt.m_pcData + 3); m_iSndLastAck = ack; } // protect packet retransmission #ifndef WIN32 pthread_mutex_lock(&m_AckLock); #else WaitForSingleObject(m_AckLock, INFINITE); #endif int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack); if (offset <= 0) { // discard it if it is a repeated ACK #ifndef WIN32 pthread_mutex_unlock(&m_AckLock); #else ReleaseMutex(m_AckLock); #endif break; } // acknowledge the sending buffer m_pSndBuffer->ackData(offset); // update sending variables m_iSndLastDataAck = ack; m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck)); // insert this socket to snd list if it is not on the list yet m_pSndQueue->m_pSndUList->update(this, false); #ifndef WIN32 pthread_mutex_unlock(&m_AckLock); pthread_mutex_lock(&m_SendBlockLock); if (m_bSynSending) pthread_cond_signal(&m_SendBlockCond); pthread_mutex_unlock(&m_SendBlockLock); #else ReleaseMutex(m_AckLock); if (m_bSynSending) SetEvent(m_SendBlockCond); #endif // Update RTT //m_iRTT = *((int32_t *)ctrlpkt.m_pcData + 1); //m_iRTTVar = *((int32_t *)ctrlpkt.m_pcData + 2); int rtt = *((int32_t *)ctrlpkt.m_pcData + 1); m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2; m_iRTT = (m_iRTT * 7 + rtt) >> 3; m_pCC->setRTT(m_iRTT); if (ctrlpkt.getLength() > 16) { // Update Estimated Bandwidth and packet delivery rate int32_t rate = *((int32_t *)ctrlpkt.m_pcData + 4); if (rate > 0) m_iDeliveryRate = (m_iDeliveryRate * 7 + rate) >> 3; int32_t bandwidth = *((int32_t *)ctrlpkt.m_pcData + 5); if (bandwidth > 0) m_iBandwidth = (m_iBandwidth * 7 + bandwidth) >> 3; m_pCC->setRcvRate(m_iDeliveryRate); m_pCC->setBandwidth(m_iBandwidth); } m_pCC->onACK(ack); // update CC parameters m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); m_dCongestionWindow = m_pCC->m_dCWndSize;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -