📄 core.cpp
字号:
locktime.tv_sec = exptime / 1000000; locktime.tv_nsec = (exptime % 1000000) * 1000; if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime) == ETIMEDOUT) timeout = true; res = m_pRcvBuffer->readMsg(data, len); } pthread_mutex_unlock(&m_RecvDataLock); #else if (m_iRcvTimeOut < 0) { while (!m_bBroken && m_bConnected && (0 == (res = m_pRcvBuffer->readMsg(data, len)))) WaitForSingleObject(m_RecvDataCond, INFINITE); } else { if (WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut)) == WAIT_TIMEOUT) timeout = true; res = m_pRcvBuffer->readMsg(data, len); } #endif if (m_bBroken) throw CUDTException(2, 1, 0); else if (!m_bConnected) throw CUDTException(2, 2, 0); } while ((0 == res) && !timeout); return res;}int64_t CUDT::sendfile(ifstream& ifs, const int64_t& offset, const int64_t& size, const int& block){ if (UDT_DGRAM == m_iSockType) throw CUDTException(5, 10, 0); CGuard sendguard(m_SendLock); if (m_bBroken) throw CUDTException(2, 1, 0); else if (!m_bConnected) throw CUDTException(2, 2, 0); if (size <= 0) return 0; int64_t tosend = size; int unitsize; // positioning... try { ifs.seekg((streamoff)offset); } catch (...) { throw CUDTException(4, 1); } // sending block by block while (tosend > 0) { unitsize = int((tosend >= block) ? block : tosend); #ifndef WIN32 pthread_mutex_lock(&m_SendBlockLock); while (!m_bBroken && m_bConnected && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())) pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); pthread_mutex_unlock(&m_SendBlockLock); #else while (!m_bBroken && m_bConnected && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())) WaitForSingleObject(m_SendBlockCond, INFINITE); #endif if (m_bBroken) throw CUDTException(2, 1, 0); else if (!m_bConnected) throw CUDTException(2, 2, 0); m_pSndBuffer->addBufferFromFile(ifs, unitsize); // insert this socket to snd list if it is not on the list yet m_pSndQueue->m_pSndUList->update(m_SocketID, this, false); tosend -= unitsize; } return size;}int64_t CUDT::recvfile(ofstream& ofs, const int64_t& offset, const int64_t& size, const int& block){ if (UDT_DGRAM == m_iSockType) throw CUDTException(5, 10, 0); CGuard recvguard(m_RecvLock); if (!m_bConnected) throw CUDTException(2, 2, 0); else if ((m_bBroken) && (0 == m_pRcvBuffer->getRcvDataSize())) throw CUDTException(2, 1, 0); if (size <= 0) return 0; int64_t torecv = size; int unitsize = block; int recvsize; // positioning... try { ofs.seekp((streamoff)offset); } catch (...) { throw CUDTException(4, 3); } // receiving... "recvfile" is always blocking while (torecv > 0) { #ifndef WIN32 pthread_mutex_lock(&m_RecvDataLock); while (!m_bBroken && m_bConnected && (0 == m_pRcvBuffer->getRcvDataSize())) pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock); pthread_mutex_unlock(&m_RecvDataLock); #else while (!m_bBroken && m_bConnected && (0 == m_pRcvBuffer->getRcvDataSize())) WaitForSingleObject(m_RecvDataCond, INFINITE); #endif if (!m_bConnected) throw CUDTException(2, 2, 0); else if (m_bBroken && (0 == m_pRcvBuffer->getRcvDataSize())) throw CUDTException(2, 1, 0); unitsize = int((torecv >= block) ? block : torecv); recvsize = m_pRcvBuffer->readBufferToFile(ofs, unitsize); torecv -= recvsize; } return size - torecv;}void CUDT::sample(CPerfMon* perf, bool clear){ if (!m_bConnected) throw CUDTException(2, 2, 0); if (m_bBroken) throw CUDTException(2, 1, 0); uint64_t currtime = CTimer::getTime(); perf->msTimeStamp = (currtime - m_StartTime) / 1000; m_llSentTotal += m_llTraceSent; m_llRecvTotal += m_llTraceRecv; m_iSndLossTotal += m_iTraceSndLoss; m_iRcvLossTotal += m_iTraceRcvLoss; m_iRetransTotal += m_iTraceRetrans; m_iSentACKTotal += m_iSentACK; m_iRecvACKTotal += m_iRecvACK; m_iSentNAKTotal += m_iSentNAK; m_iRecvNAKTotal += m_iRecvNAK; perf->pktSentTotal = m_llSentTotal; perf->pktRecvTotal = m_llRecvTotal; perf->pktSndLossTotal = m_iSndLossTotal; perf->pktRcvLossTotal = m_iRcvLossTotal; perf->pktRetransTotal = m_iRetransTotal; perf->pktSentACKTotal = m_iSentACKTotal; perf->pktRecvACKTotal = m_iRecvACKTotal; perf->pktSentNAKTotal = m_iSentNAKTotal; perf->pktRecvNAKTotal = m_iRecvNAKTotal; perf->pktSent = m_llTraceSent; perf->pktRecv = m_llTraceRecv; perf->pktSndLoss = m_iTraceSndLoss; perf->pktRcvLoss = m_iTraceRcvLoss; perf->pktRetrans = m_iTraceRetrans; perf->pktSentACK = m_iSentACK; perf->pktRecvACK = m_iRecvACK; perf->pktSentNAK = m_iSentNAK; perf->pktRecvNAK = m_iRecvNAK; double interval = double(currtime - m_LastSampleTime); perf->mbpsSendRate = double(m_llTraceSent) * m_iPayloadSize * 8.0 / interval; perf->mbpsRecvRate = double(m_llTraceRecv) * m_iPayloadSize * 8.0 / interval; perf->usPktSndPeriod = m_ullInterval / double(m_ullCPUFrequency); perf->pktFlowWindow = m_iFlowWindowSize; perf->pktCongestionWindow = (int)m_dCongestionWindow; perf->pktFlightSize = CSeqNo::seqlen(const_cast<int32_t&>(m_iSndLastAck), m_iSndCurrSeqNo); perf->msRTT = m_iRTT/1000.0; perf->mbpsBandwidth = m_iBandwidth * m_iPayloadSize * 8.0 / 1000000.0; #ifndef WIN32 if (0 == pthread_mutex_trylock(&m_ConnectionLock)) #else if (WAIT_OBJECT_0 == WaitForSingleObject(m_ConnectionLock, 0)) #endif { perf->byteAvailSndBuf = (NULL == m_pSndBuffer) ? 0 : m_iSndBufSize - m_pSndBuffer->getCurrBufSize(); perf->byteAvailRcvBuf = (NULL == m_pRcvBuffer) ? 0 : m_pRcvBuffer->getAvailBufSize(); #ifndef WIN32 pthread_mutex_unlock(&m_ConnectionLock); #else ReleaseMutex(m_ConnectionLock); #endif } else { perf->byteAvailSndBuf = 0; perf->byteAvailRcvBuf = 0; } if (clear) { m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceSndLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0; m_LastSampleTime = currtime; }}void CUDT::initSynch(){ #ifndef WIN32 pthread_mutex_init(&m_SendBlockLock, NULL); pthread_cond_init(&m_SendBlockCond, NULL); pthread_mutex_init(&m_RecvDataLock, NULL); pthread_cond_init(&m_RecvDataCond, NULL); pthread_mutex_init(&m_SendLock, NULL); pthread_mutex_init(&m_RecvLock, NULL); pthread_mutex_init(&m_AckLock, NULL); pthread_mutex_init(&m_ConnectionLock, NULL); #else m_SendBlockLock = CreateMutex(NULL, false, NULL); m_SendBlockCond = CreateEvent(NULL, false, false, NULL); m_RecvDataLock = CreateMutex(NULL, false, NULL); m_RecvDataCond = CreateEvent(NULL, false, false, NULL); m_SendLock = CreateMutex(NULL, false, NULL); m_RecvLock = CreateMutex(NULL, false, NULL); m_AckLock = CreateMutex(NULL, false, NULL); m_ConnectionLock = CreateMutex(NULL, false, NULL); #endif}void CUDT::destroySynch(){ #ifndef WIN32 pthread_mutex_destroy(&m_SendBlockLock); pthread_cond_destroy(&m_SendBlockCond); pthread_mutex_destroy(&m_RecvDataLock); pthread_cond_destroy(&m_RecvDataCond); pthread_mutex_destroy(&m_SendLock); pthread_mutex_destroy(&m_RecvLock); pthread_mutex_destroy(&m_AckLock); pthread_mutex_destroy(&m_ConnectionLock); #else CloseHandle(m_SendBlockLock); CloseHandle(m_SendBlockCond); CloseHandle(m_RecvDataLock); CloseHandle(m_RecvDataCond); CloseHandle(m_SendLock); CloseHandle(m_RecvLock); CloseHandle(m_AckLock); CloseHandle(m_ConnectionLock); #endif}void CUDT::releaseSynch(){ #ifndef WIN32 // wake up user calls pthread_mutex_lock(&m_SendBlockLock); pthread_cond_signal(&m_SendBlockCond); pthread_mutex_unlock(&m_SendBlockLock); pthread_mutex_lock(&m_SendLock); pthread_mutex_unlock(&m_SendLock); pthread_mutex_lock(&m_RecvDataLock); pthread_cond_signal(&m_RecvDataCond); pthread_mutex_unlock(&m_RecvDataLock); pthread_mutex_lock(&m_RecvLock); pthread_mutex_unlock(&m_RecvLock); #else SetEvent(m_SendBlockCond); WaitForSingleObject(m_SendLock, INFINITE); ReleaseMutex(m_SendLock); SetEvent(m_RecvDataCond); WaitForSingleObject(m_RecvLock, INFINITE); ReleaseMutex(m_RecvLock); #endif}void CUDT::sendCtrl(const int& pkttype, void* lparam, void* rparam, const int& size){ CPacket ctrlpkt; switch (pkttype) { case 2: //010 - Acknowledgement { int32_t ack; // If there is no loss, the ACK is the current largest sequence number plus 1; // Otherwise it is the smallest sequence number in the receiver loss list. if (0 == m_pRcvLossList->getLossLength()) ack = CSeqNo::incseq(m_iRcvCurrSeqNo); else ack = m_pRcvLossList->getFirstLostSeq(); if (ack == m_iRcvLastAckAck) break; // send out a lite ACK // to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number if (4 == size) { ctrlpkt.pack(2, NULL, &ack, size); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; } uint64_t currtime; CTimer::rdtsc(currtime); // There are new received packets to acknowledge, update related information. if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0) { int acksize = CSeqNo::seqoff(m_iRcvLastAck, ack); m_iRcvLastAck = ack; m_pRcvBuffer->ackData(acksize); // signal a waiting "recv" call if there is any data available #ifndef WIN32 pthread_mutex_lock(&m_RecvDataLock); if (m_bSynRecving) pthread_cond_signal(&m_RecvDataCond); pthread_mutex_unlock(&m_RecvDataLock); #else if (m_bSynRecving) SetEvent(m_RecvDataCond); #endif } else if (ack == m_iRcvLastAck) { if ((currtime - m_ullLastAckTime) < ((m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency)) break; } else break; // Send out the ACK only if has not been received by the sender before if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0) { int32_t data[6]; m_iAckSeqNo = CAckNo::incack(m_iAckSeqNo); data[0] = m_iRcvLastAck; data[1] = m_iRTT; data[2] = m_iRTTVar; data[3] = m_pRcvBuffer->getAvailBufSize(); // a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock if (data[3] < 2) data[3] = 2; if (CTimer::getTime() - m_ullLastAckTime > (uint64_t)m_iSYNInterval) { data[4] = m_pRcvTimeWindow->getPktRcvSpeed(); data[5] = m_pRcvTimeWindow->getBandwidth(); ctrlpkt.pack(2, &m_iAckSeqNo, data, 24); } else { ctrlpkt.pack(2, &m_iAckSeqNo, data, 16); } ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); m_pACKWindow->store(m_iAckSeqNo, m_iRcvLastAck); CTimer::rdtsc(m_ullLastAckTime); ++ m_iSentACK; } break; } case 6: //110 - Acknowledgement of Acknowledgement ctrlpkt.pack(6, lparam); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 3: //011 - Loss Report if (NULL != rparam) { if (1 == size) { // only 1 loss packet ctrlpkt.pack(3, NULL, (int32_t *)rparam + 1, 4); } else { // more than 1 loss packets ctrlpkt.pack(3, NULL, rparam, 8); } ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); ++ m_iSentNAK; } else if (m_pRcvLossList->getLossLength() > 0) { // this is periodically NAK report // read loss list from the local receiver loss list int32_t* data = new int32_t[m_iPayloadSize / 4]; int losslen; m_pRcvLossList->getLossArray(data, losslen, m_iPayloadSize / 4, m_iRTT + 4 * m_iRTTVar); if (0 < losslen) { ctrlpkt.pack(3, NULL, data, losslen * 4); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); ++ m_iSentNAK; } delete [] data; } break; case 4: //100 - Congestion Warning ctrlpkt.pack(4); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); CTimer::rdtsc(m_ullLastWarningTime); break; case 1: //001 - Keep-alive ctrlpkt.pack(1); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 0: //000 - Handshake ctrlpkt.pack(0, NULL, rparam, sizeof(CHandShake)); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 5: //101 - Shutdown ctrlpkt.pack(5); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 7: //111 - Msg drop request ctrlpkt.pack(7, lparam, rparam, 8); ctrlpkt.m_iID = m_PeerID; m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt); break; case 65535: //0x7FFF - Resevered for future use break; default: break; }}void CUDT::processCtrl(CPacket& ctrlpkt){ // Just heard from the peer, reset the expiration count. m_iEXPCount = 1; if ((CSeqNo::incseq(m_iSndCurrSeqNo) == m_iSndLastAck) || (2 == ctrlpkt.getType()) || (3 == ctrlpkt.getType())) { m_ullEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt; if (m_ullEXPInt < 13 * m_ullSYNInt) m_ullEXPInt = 13 * m_ullSYNInt; CTimer::rdtsc(m_ullNextEXPTime); m_ullNextEXPTime += m_ullEXPInt; } switch (ctrlpkt.getType()) { case 2: //010 - Acknowledgement { int32_t ack; // process a lite ACK if (4 == ctrlpkt.getLength()) { ack = *(int32_t *)ctrlpkt.m_pcData; if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0) { m_iFlowWindowSize -= CSeqNo::seqoff(const_cast<int32_t&>(m_iSndLastAck), ack); m_iSndLastAck = ack; } break; } // read ACK seq. no. ack = ctrlpkt.getAckSeqNo(); // send ACK acknowledgement sendCtrl(6, &ack); // Got data ACK ack = *(int32_t *)ctrlpkt.m_pcData; if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -