⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 core.cpp

📁 Last Update: Jan 22 2009 可靠UDP传输, 一套高效的基于windows平台的C++ 开发库
💻 CPP
📖 第 1 页 / 共 5 页
字号:
      pthread_mutex_init(&m_RecvDataLock, NULL);      pthread_cond_init(&m_RecvDataCond, NULL);      pthread_mutex_init(&m_SendLock, NULL);      pthread_mutex_init(&m_RecvLock, NULL);      pthread_mutex_init(&m_AckLock, NULL);      pthread_mutex_init(&m_ConnectionLock, NULL);   #else      m_SendBlockLock = CreateMutex(NULL, false, NULL);      m_SendBlockCond = CreateEvent(NULL, false, false, NULL);      m_RecvDataLock = CreateMutex(NULL, false, NULL);      m_RecvDataCond = CreateEvent(NULL, false, false, NULL);      m_SendLock = CreateMutex(NULL, false, NULL);      m_RecvLock = CreateMutex(NULL, false, NULL);      m_AckLock = CreateMutex(NULL, false, NULL);      m_ConnectionLock = CreateMutex(NULL, false, NULL);   #endif}void CUDT::destroySynch(){   #ifndef WIN32      pthread_mutex_destroy(&m_SendBlockLock);      pthread_cond_destroy(&m_SendBlockCond);      pthread_mutex_destroy(&m_RecvDataLock);      pthread_cond_destroy(&m_RecvDataCond);      pthread_mutex_destroy(&m_SendLock);      pthread_mutex_destroy(&m_RecvLock);      pthread_mutex_destroy(&m_AckLock);      pthread_mutex_destroy(&m_ConnectionLock);   #else      CloseHandle(m_SendBlockLock);      CloseHandle(m_SendBlockCond);      CloseHandle(m_RecvDataLock);      CloseHandle(m_RecvDataCond);      CloseHandle(m_SendLock);      CloseHandle(m_RecvLock);      CloseHandle(m_AckLock);      CloseHandle(m_ConnectionLock);   #endif}void CUDT::releaseSynch(){   #ifndef WIN32      // wake up user calls      pthread_mutex_lock(&m_SendBlockLock);      pthread_cond_signal(&m_SendBlockCond);      pthread_mutex_unlock(&m_SendBlockLock);      pthread_mutex_lock(&m_SendLock);      pthread_mutex_unlock(&m_SendLock);      pthread_mutex_lock(&m_RecvDataLock);      pthread_cond_signal(&m_RecvDataCond);      pthread_mutex_unlock(&m_RecvDataLock);      pthread_mutex_lock(&m_RecvLock);      pthread_mutex_unlock(&m_RecvLock);   #else      SetEvent(m_SendBlockCond);      WaitForSingleObject(m_SendLock, INFINITE);      ReleaseMutex(m_SendLock);      SetEvent(m_RecvDataCond);      WaitForSingleObject(m_RecvLock, INFINITE);      ReleaseMutex(m_RecvLock);   #endif}void CUDT::sendCtrl(const int& pkttype, void* lparam, void* rparam, const int& size){   CPacket ctrlpkt;   switch (pkttype)   {   case 2: //010 - Acknowledgement      {      int32_t ack;      // If there is no loss, the ACK is the current largest sequence number plus 1;      // Otherwise it is the smallest sequence number in the receiver loss list.      if (0 == m_pRcvLossList->getLossLength())         ack = CSeqNo::incseq(m_iRcvCurrSeqNo);      else         ack = m_pRcvLossList->getFirstLostSeq();      if (ack == m_iRcvLastAckAck)         break;      // send out a lite ACK      // to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number      if (4 == size)      {         ctrlpkt.pack(2, NULL, &ack, size);         ctrlpkt.m_iID = m_PeerID;         m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);         break;      }      uint64_t currtime;      CTimer::rdtsc(currtime);      // There are new received packets to acknowledge, update related information.      if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0)      {         int acksize = CSeqNo::seqoff(m_iRcvLastAck, ack);         m_iRcvLastAck = ack;         m_pRcvBuffer->ackData(acksize);         // signal a waiting "recv" call if there is any data available         #ifndef WIN32            pthread_mutex_lock(&m_RecvDataLock);            if (m_bSynRecving)               pthread_cond_signal(&m_RecvDataCond);            pthread_mutex_unlock(&m_RecvDataLock);         #else            if (m_bSynRecving)               SetEvent(m_RecvDataCond);         #endif      }      else if (ack == m_iRcvLastAck)      {         if ((currtime - m_ullLastAckTime) < ((m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency))            break;      }      else         break;      // Send out the ACK only if has not been received by the sender before      if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0)      {         int32_t data[6];         m_iAckSeqNo = CAckNo::incack(m_iAckSeqNo);         data[0] = m_iRcvLastAck;         data[1] = m_iRTT;         data[2] = m_iRTTVar;         data[3] = m_pRcvBuffer->getAvailBufSize();         // a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock         if (data[3] < 2)            data[3] = 2;         if (currtime - m_ullLastAckTime > m_ullSYNInt)         {            data[4] = m_pRcvTimeWindow->getPktRcvSpeed();            data[5] = m_pRcvTimeWindow->getBandwidth();            ctrlpkt.pack(2, &m_iAckSeqNo, data, 24);            CTimer::rdtsc(m_ullLastAckTime);         }         else         {            ctrlpkt.pack(2, &m_iAckSeqNo, data, 16);         }         ctrlpkt.m_iID = m_PeerID;         m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);         m_pACKWindow->store(m_iAckSeqNo, m_iRcvLastAck);         ++ m_iSentACK;      }      break;      }   case 6: //110 - Acknowledgement of Acknowledgement      ctrlpkt.pack(6, lparam);      ctrlpkt.m_iID = m_PeerID;      m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);      break;   case 3: //011 - Loss Report      if (NULL != rparam)      {         if (1 == size)         {            // only 1 loss packet            ctrlpkt.pack(3, NULL, (int32_t *)rparam + 1, 4);         }         else         {            // more than 1 loss packets            ctrlpkt.pack(3, NULL, rparam, 8);         }         ctrlpkt.m_iID = m_PeerID;         m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);         ++ m_iSentNAK;      }      else if (m_pRcvLossList->getLossLength() > 0)      {         // this is periodically NAK report         // read loss list from the local receiver loss list         int32_t* data = new int32_t[m_iPayloadSize / 4];         int losslen;         m_pRcvLossList->getLossArray(data, losslen, m_iPayloadSize / 4, m_iRTT + 4 * m_iRTTVar);         if (0 < losslen)         {            ctrlpkt.pack(3, NULL, data, losslen * 4);            ctrlpkt.m_iID = m_PeerID;            m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);            ++ m_iSentNAK;         }         delete [] data;      }      break;   case 4: //100 - Congestion Warning      ctrlpkt.pack(4);      ctrlpkt.m_iID = m_PeerID;      m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);      CTimer::rdtsc(m_ullLastWarningTime);      break;   case 1: //001 - Keep-alive      ctrlpkt.pack(1);      ctrlpkt.m_iID = m_PeerID;      m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);       break;   case 0: //000 - Handshake      ctrlpkt.pack(0, NULL, rparam, sizeof(CHandShake));      ctrlpkt.m_iID = m_PeerID;      m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);      break;   case 5: //101 - Shutdown      ctrlpkt.pack(5);      ctrlpkt.m_iID = m_PeerID;      m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);      break;   case 7: //111 - Msg drop request      ctrlpkt.pack(7, lparam, rparam, 8);      ctrlpkt.m_iID = m_PeerID;      m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);      break;   case 65535: //0x7FFF - Resevered for future use      break;   default:      break;   }}void CUDT::processCtrl(CPacket& ctrlpkt){   // Just heard from the peer, reset the expiration count.   m_iEXPCount = 1;   if ((CSeqNo::incseq(m_iSndCurrSeqNo) == m_iSndLastAck) || (2 == ctrlpkt.getType()) || (3 == ctrlpkt.getType()))   {      m_ullEXPInt = m_ullMinEXPInt;      CTimer::rdtsc(m_ullNextEXPTime);      m_ullNextEXPTime += m_ullEXPInt;   }   switch (ctrlpkt.getType())   {   case 2: //010 - Acknowledgement      {      int32_t ack;      // process a lite ACK      if (4 == ctrlpkt.getLength())      {         ack = *(int32_t *)ctrlpkt.m_pcData;         if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0)         {            m_iFlowWindowSize -= CSeqNo::seqoff(const_cast<int32_t&>(m_iSndLastAck), ack);            m_iSndLastAck = ack;         }         break;      }      // read ACK seq. no.      ack = ctrlpkt.getAckSeqNo();      // send ACK acknowledgement      // ACK2 can be much less than ACK      uint64_t currtime = CTimer::getTime();      if ((currtime - m_ullSndLastAck2Time > (uint64_t)m_iSYNInterval) || (ack == m_iSndLastAck2))      {         sendCtrl(6, &ack);         m_iSndLastAck2 = ack;         m_ullSndLastAck2Time = currtime;      }      // Got data ACK      ack = *(int32_t *)ctrlpkt.m_pcData;      // check the validation of the ack      if (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0)      {         //this should not happen: attack or bug         m_bBroken = true;         m_iBrokenCounter = 0;         break;      }      if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0)      {         // Update Flow Window Size, must update before and together with m_iSndLastAck         m_iFlowWindowSize = *((int32_t *)ctrlpkt.m_pcData + 3);         m_iSndLastAck = ack;      }      // protect packet retransmission      CGuard::enterCS(m_AckLock);      int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack);      if (offset <= 0)      {         // discard it if it is a repeated ACK         CGuard::leaveCS(m_AckLock);         break;      }      // acknowledge the sending buffer      m_pSndBuffer->ackData(offset);      // update sending variables      m_iSndLastDataAck = ack;      m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck));      CGuard::leaveCS(m_AckLock);      #ifndef WIN32         pthread_mutex_lock(&m_SendBlockLock);         if (m_bSynSending)            pthread_cond_signal(&m_SendBlockCond);         pthread_mutex_unlock(&m_SendBlockLock);      #else         if (m_bSynSending)            SetEvent(m_SendBlockCond);      #endif      // insert this socket to snd list if it is not on the list yet      m_pSndQueue->m_pSndUList->update(this, false);      // Update RTT      //m_iRTT = *((int32_t *)ctrlpkt.m_pcData + 1);      //m_iRTTVar = *((int32_t *)ctrlpkt.m_pcData + 2);      int rtt = *((int32_t *)ctrlpkt.m_pcData + 1);      m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2;      m_iRTT = (m_iRTT * 7 + rtt) >> 3;      m_pCC->setRTT(m_iRTT);      m_ullMinEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt;      if (m_ullMinEXPInt < 100000 * m_ullCPUFrequency)          m_ullMinEXPInt = 100000 * m_ullCPUFrequency;      if (ctrlpkt.getLength() > 16)      {         // Update Estimated Bandwidth and packet delivery rate         if (*((int32_t *)ctrlpkt.m_pcData + 4) > 0)            m_iDeliveryRate = (m_iDeliveryRate * 7 + *((int32_t *)ctrlpkt.m_pcData + 4)) >> 3;         if (*((int32_t *)ctrlpkt.m_pcData + 5) > 0)            m_iBandwidth = (m_iBandwidth * 7 + *((int32_t *)ctrlpkt.m_pcData + 5)) >> 3;         m_pCC->setRcvRate(m_iDeliveryRate);         m_pCC->setBandwidth(m_iBandwidth);      }      m_pCC->onACK(ack);      // update CC parameters      m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);      m_dCongestionWindow = m_pCC->m_dCWndSize;      ++ m_iRecvACK;      break;      }   case 6: //110 - Acknowledgement of Acknowledgement      {      int32_t ack;      int rtt = -1;      // update RTT      rtt = m_pACKWindow->acknowledge(ctrlpkt.getAckSeqNo(), ack);      if (rtt <= 0)         break;      //if increasing delay detected...      //   sendCtrl(4);      // RTT EWMA      m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2;      m_iRTT = (m_iRTT * 7 + rtt) >> 3;      m_pCC->setRTT(m_iRTT);      m_ullMinEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt;      if (m_ullMinEXPInt < 100000 * m_ullCPUFrequency)          m_ullMinEXPInt = 100000 * m_ullCPUFrequency;      // update last ACK that has been received by the sender      if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0)         m_iRcvLastAckAck = ack;      break;      }   case 3: //011 - Loss Report      {      int32_t* losslist = (int32_t *)(ctrlpkt.m_pcData);      m_pCC->onLoss(losslist, ctrlpkt.getLength());      // update CC parameters      m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);      m_dCongestionWindow = m_pCC->m_dCWndSize;      bool secure = true;      // decode loss list message and insert loss into the sender loss list      for (int i = 0, n = (int)(ctrlpkt.getLength() / 4); i < n; ++ i)      {         if (0 != (losslist[i] & 0x80000000))         {            if ((CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, losslist[i + 1]) > 0) || (CSeqNo::seqcmp(losslist[i + 1], m_iSndCurrSeqNo) > 0))            {               // seq_a must not be greater than seq_b; seq_b must not be greater than the most recent sent seq               secure = false;               break;            }            if (CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, const_cast<int32_t&>(m_iSndLastAck)) >= 0)               m_iTraceSndLoss += m_pSndLossList->insert(losslist[i] & 0x7FFFFFFF, losslist[i + 1]);            else if (CSeqNo::seqcmp(losslist[i + 1], const_cast<int32_t&>(m_iSndLastAck)) >= 0)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -