⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 core.cpp

📁 可靠UDP传输, 长距离传输或者无线传输比TCP效率要高很多
💻 CPP
📖 第 1 页 / 共 4 页
字号:
      {         // Update Flow Window Size, must update before and together with m_iSndLastAck         m_iFlowWindowSize = *((int32_t *)ctrlpkt.m_pcData + 3);         m_iSndLastAck = ack;      }      // protect packet retransmission      #ifndef WIN32         pthread_mutex_lock(&m_AckLock);      #else         WaitForSingleObject(m_AckLock, INFINITE);      #endif      int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack);      if (offset <= 0)      {         // discard it if it is a repeated ACK         #ifndef WIN32            pthread_mutex_unlock(&m_AckLock);         #else            ReleaseMutex(m_AckLock);         #endif         break;      }      // acknowledge the sending buffer      m_pSndBuffer->ackData(offset);      // update sending variables      m_iSndLastDataAck = ack;      m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck));      // insert this socket to snd list if it is not on the list yet      m_pSndQueue->m_pSndUList->update(m_SocketID, this, false);      #ifndef WIN32         pthread_mutex_unlock(&m_AckLock);         pthread_mutex_lock(&m_SendBlockLock);         if (m_bSynSending)            pthread_cond_signal(&m_SendBlockCond);         pthread_mutex_unlock(&m_SendBlockLock);      #else         ReleaseMutex(m_AckLock);         if (m_bSynSending)            SetEvent(m_SendBlockCond);      #endif      // Update RTT      //m_iRTT = *((int32_t *)ctrlpkt.m_pcData + 1);      //m_iRTTVar = *((int32_t *)ctrlpkt.m_pcData + 2);      int rtt = *((int32_t *)ctrlpkt.m_pcData + 1);      m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2;      m_iRTT = (m_iRTT * 7 + rtt) >> 3;      m_pCC->setRTT(m_iRTT);      if (ctrlpkt.getLength() > 16)      {         // Update Estimated Bandwidth and packet delivery rate         if (*((int32_t *)ctrlpkt.m_pcData + 4) > 0)            m_iDeliveryRate = (m_iDeliveryRate * 7 + *((int32_t *)ctrlpkt.m_pcData + 4)) >> 3;         if (*((int32_t *)ctrlpkt.m_pcData + 5) > 0)            m_iBandwidth = (m_iBandwidth * 7 + *((int32_t *)ctrlpkt.m_pcData + 5)) >> 3;         m_pCC->setRcvRate(m_iDeliveryRate);         m_pCC->setBandwidth(m_iBandwidth);      }      m_pCC->onACK(ack);      // update CC parameters      m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);      m_dCongestionWindow = m_pCC->m_dCWndSize;      ++ m_iRecvACK;      break;      }   case 6: //110 - Acknowledgement of Acknowledgement      {      int32_t ack;      int rtt = -1;      // update RTT      rtt = m_pACKWindow->acknowledge(ctrlpkt.getAckSeqNo(), ack);      if (rtt <= 0)         break;      //if increasing delay detected...      //   sendCtrl(4);      // RTT EWMA      m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2;      m_iRTT = (m_iRTT * 7 + rtt) >> 3;      // update last ACK that has been received by the sender      if (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0)         m_iRcvLastAckAck = ack;      break;      }   case 3: //011 - Loss Report      {      int32_t* losslist = (int32_t *)(ctrlpkt.m_pcData);      m_pCC->onLoss(losslist, ctrlpkt.getLength());      // update CC parameters      m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);      m_dCongestionWindow = m_pCC->m_dCWndSize;      // decode loss list message and insert loss into the sender loss list      for (int i = 0, n = (int)(ctrlpkt.getLength() / 4); i < n; ++ i)      {         if (0 != (losslist[i] & 0x80000000))         {            if (CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, const_cast<int32_t&>(m_iSndLastAck)) >= 0)               m_iTraceSndLoss += m_pSndLossList->insert(losslist[i] & 0x7FFFFFFF, losslist[i + 1]);            else if (CSeqNo::seqcmp(losslist[i + 1], const_cast<int32_t&>(m_iSndLastAck)) >= 0)               m_iTraceSndLoss += m_pSndLossList->insert(const_cast<int32_t&>(m_iSndLastAck), losslist[i + 1]);            ++ i;         }         else if (CSeqNo::seqcmp(losslist[i], const_cast<int32_t&>(m_iSndLastAck)) >= 0)         {            m_iTraceSndLoss += m_pSndLossList->insert(losslist[i], losslist[i]);         }      }      // Wake up the waiting sender (avoiding deadlock on an infinite sleeping)      m_pSndLossList->insert(const_cast<int32_t&>(m_iSndLastAck), const_cast<int32_t&>(m_iSndLastAck));	        // the lost packet (retransmission) should be sent out immediately      m_pSndQueue->m_pSndUList->update(m_SocketID, this);      ++ m_iRecvNAK;      break;      }   case 4: //100 - Delay Warning      // One way packet delay is increasing, so decrease the sending rate      m_ullInterval = (uint64_t)ceil(m_ullInterval * 1.125);      m_iLastDecSeq = m_iSndCurrSeqNo;      break;   case 1: //001 - Keep-alive      // The only purpose of keep-alive packet is to tell that the peer is still alive      // nothing needs to be done.      break;   case 0: //000 - Handshake      if ((((CHandShake*)(ctrlpkt.m_pcData))->m_iReqType > 0) || (m_bRendezvous && (((CHandShake*)(ctrlpkt.m_pcData))->m_iReqType != -2)))      {         // The peer side has not received the handshake message, so it keeps querying         // resend the handshake packet         CHandShake initdata;         initdata.m_iISN = m_iISN;         initdata.m_iMSS = m_iMSS;         initdata.m_iFlightFlagSize = m_iFlightFlagSize;         initdata.m_iReqType = (!m_bRendezvous) ? -1 : -2;         initdata.m_iID = m_SocketID;         sendCtrl(0, NULL, (char *)&initdata, sizeof(CHandShake));      }      break;   case 5: //101 - Shutdown      m_bShutdown = true;      m_bClosing = true;      m_bBroken = true;      // Signal the sender and recver if they are waiting for data.      releaseSynch();      CTimer::triggerEvent();      break;   case 7: //111 - Msg drop request      m_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq());      m_pRcvLossList->remove(*(int32_t*)ctrlpkt.m_pcData, *(int32_t*)(ctrlpkt.m_pcData + 4));      break;   case 65535: //0x7FFF - reserved and user defined messages      m_pCC->processCustomMsg(&ctrlpkt);      // update CC parameters      m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);      m_dCongestionWindow = m_pCC->m_dCWndSize;      break;   default:      break;   }}int CUDT::packData(CPacket& packet, uint64_t& ts){   if (m_bClosing || m_bBroken)   {      ts = 0;      return 0;   }   int payload = 0;   bool probe = false;   CTimer::rdtsc(ts);   uint64_t entertime;   CTimer::rdtsc(entertime);   if ((0 != m_ullTargetTime) && (entertime > m_ullTargetTime))      m_ullTimeDiff += entertime - m_ullTargetTime;   // Loss retransmission always has higher priority.   if ((packet.m_iSeqNo = m_pSndLossList->getLostSeq()) >= 0)   {      // protect m_iSndLastDataAck from updating by ACK processing      CGuard ackguard(m_AckLock);      int offset = CSeqNo::seqoff(m_iSndLastDataAck, packet.m_iSeqNo);      if (offset < 0)         return 0;      int msglen;      payload = m_pSndBuffer->readData(&(packet.m_pcData), offset, packet.m_iMsgNo, msglen);      if (-1 == payload)      {         int32_t seqpair[2];         seqpair[0] = packet.m_iSeqNo;         seqpair[1] = CSeqNo::incseq(seqpair[0], msglen);         sendCtrl(7, &packet.m_iMsgNo, seqpair, 8);         // only one msg drop request is necessary         m_pSndLossList->remove(seqpair[1]);         return 0;      }      else if (0 == payload)         return 0;      ++ m_iTraceRetrans;   }   else   {      // If no loss, pack a new packet.      // check congestion/flow window limit      int cwnd = (m_iFlowWindowSize < (int)m_dCongestionWindow) ? m_iFlowWindowSize : (int)m_dCongestionWindow;      if (cwnd >= CSeqNo::seqlen(const_cast<int32_t&>(m_iSndLastAck), CSeqNo::incseq(m_iSndCurrSeqNo)))      {         if (0 != (payload = m_pSndBuffer->readData(&(packet.m_pcData), packet.m_iMsgNo)))         {            m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);            m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);            packet.m_iSeqNo = m_iSndCurrSeqNo;            // every 16 (0xF) packets, a packet pair is sent            if (0 == (packet.m_iSeqNo & 0xF))               probe = true;         }         else         {            m_ullTargetTime = 0;            m_ullTimeDiff = 0;            ts = 0;            return 0;         }      }      else      {         m_ullTargetTime = 0;         m_ullTimeDiff = 0;         ts = 0;         return 0;      }   }   packet.m_iTimeStamp = int(CTimer::getTime() - m_StartTime);   m_pSndTimeWindow->onPktSent(packet.m_iTimeStamp);   packet.m_iID = m_PeerID;   m_pCC->onPktSent(&packet);   ++ m_llTraceSent;   if (probe)   {      // sends out probing packet pair      CTimer::rdtsc(ts);      probe = false;   }   else   {      #ifndef NO_BUSY_WAITING         ts = entertime + m_ullInterval;      #else         if (m_ullTimeDiff >= m_ullInterval)         {            ts = entertime;            m_ullTimeDiff -= m_ullInterval;         }         else         {            ts = entertime + m_ullInterval - m_ullTimeDiff;            m_ullTimeDiff = 0;         }      #endif   }   m_ullTargetTime = ts;   packet.m_iID = m_PeerID;   packet.setLength(payload);   return payload;}int CUDT::processData(CUnit* unit){   if (m_bClosing || m_bBroken)      return -1;   CPacket& packet = unit->m_Packet;   // Just heard from the peer, reset the expiration count.   m_iEXPCount = 1;   m_ullEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt;   if (m_ullEXPInt < 13 * m_ullSYNInt)      m_ullEXPInt = 13 * m_ullSYNInt;   if (CSeqNo::incseq(m_iSndCurrSeqNo) == m_iSndLastAck)   {      CTimer::rdtsc(m_ullNextEXPTime);      if (!m_pCC->m_bUserDefinedRTO)         m_ullNextEXPTime += m_ullEXPInt;      else         m_ullNextEXPTime += m_pCC->m_iRTO * m_ullCPUFrequency;   }   m_pCC->onPktReceived(&packet);   m_iPktCount ++;   // update time information   m_pRcvTimeWindow->onPktArrival();   // check if it is probing packet pair   if (0 == (packet.m_iSeqNo & 0xF))      m_pRcvTimeWindow->probe1Arrival();   else if (1 == (packet.m_iSeqNo & 0xF))      m_pRcvTimeWindow->probe2Arrival();   ++ m_llTraceRecv;   int32_t offset = CSeqNo::seqoff(m_iRcvLastAck, packet.m_iSeqNo);   if ((offset < 0) || (offset >= m_pRcvBuffer->getAvailBufSize()))      return -1;   if (m_pRcvBuffer->addData(unit, offset) < 0)      return -1;   // Loss detection.   if (CSeqNo::seqcmp(packet.m_iSeqNo, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0)   {      // If loss found, insert them to the receiver loss list      m_pRcvLossList->insert(CSeqNo::incseq(m_iRcvCurrSeqNo), CSeqNo::decseq(packet.m_iSeqNo));      // pack loss list for NAK      int32_t lossdata[2];      lossdata[0] = CSeqNo::incseq(m_iRcvCurrSeqNo) | 0x80000000;      lossdata[1] = CSeqNo::decseq(packet.m_iSeqNo);      // Generate loss report immediately.      sendCtrl(3, NULL, lossdata, (CSeqNo::incseq(m_iRcvCurrSeqNo) == CSeqNo::decseq(packet.m_iSeqNo)) ? 1 : 2);      m_iTraceRcvLoss += CSeqNo::seqlen(m_iRcvCurrSeqNo, packet.m_iSeqNo) - 2;   }   // This is not a regular fixed size packet...   //an irregular sized packet usually indicates the end of a message, so send an ACK immediately   if (packet.getLength() != m_iPayloadSize)      CTimer::rdtsc(m_ullNextACKTime);   // Update the current largest sequence number that has been received.   // Or it is a retransmitted packet, remove it from receiver loss list.   if (CSeqNo::seqcmp(packet.m_iSeqNo, m_iRcvCurrSeqNo) > 0)      m_iRcvCurrSeqNo = packet.m_iSeqNo;   else      m_pRcvLossList->remove(packet.m_iSeqNo);   return 0;}int CUDT::listen(sockaddr* addr, CPacket& packet){   CGuard cg(m_ConnectionLock);   if (m_bClosing)      return 1002;   CHandShake* hs = (CHandShake *)packet.m_pcData;   int32_t id = hs->m_iID;   // When a peer side connects in...   if ((1 == packet.getFlag()) && (0 == packet.getType()))   {      if ((hs->m_iVersion != m_iVersion) || (hs->m_iType != m_iSockType) || (-1 == s_UDTUnited.newConnection(m_SocketID, addr, hs)))      {         // couldn't create a new connection, reject the request         hs->m_iReqType = 1002;      }      packet.m_iID = id;      m_pSndQueue->sendto(addr, packet);   }   return hs->m_iReqType;}void CUDT::checkTimers(){   if (m_bClosing || m_bBroken)      return;   // update CC parameters   m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);   m_dCongestionWindow = m_pCC->m_dCWndSize;   if (m_ullInterval < (uint64_t)(m_ullCPUFrequency * m_pSndTimeWindow->getMinPktSndInt() * 0.9))      m_ullInterval = (uint64_t)(m_ullCPUFrequency * m_pSndTimeWindow->getMinPktSndInt() * 0.9);   uint64_t currtime;   CTimer::rdtsc(currtime);   int32_t loss = m_pRcvLossList->getFirstLostSeq();   if ((currtime > m_ullNextACKTime) || ((m_pCC->m_iACKInterval > 0) && (m_pCC->m_iACKInterval <= m_iPktCount)))   {      // ACK timer expired or ACK interval reached      sendCtrl(2);      CTimer::rdtsc(currtime);      if (m_pCC->m_iACKPeriod > 0)         m_ullNextACKTime = currtime + m_pCC->m_iACKPeriod * m_ullCPUFrequency;      else         m_ullNextACKTime = currtime + m_ullACKInt;      m_iPktCount = 0;      m_iLightACKCount = 1;   }   else if (m_iSelfClockInterval * m_iLightACKCount <= m_iPktCount)   {      //send a "light" ACK      sendCtrl(2, NULL, NULL, 4);      ++ m_iLightACKCount;   }   if ((loss >= 0) && (currtime > m_ullNextNAKTime))   {      // NAK timer expired, and there is loss to be reported.      sendCtrl(3);      CTimer::rdtsc(currtime);      m_ullNextNAKTime = currtime + m_ullNAKInt;   }   if (currtime > m_ullNextEXPTime)   {      // Haven't receive any information from the peer, is it dead?!      // timeout: at least 16 expirations and must be greater than 3 seconds and be less than 30 seconds      if (((m_iEXPCount > 16) &&           (m_iEXPCount * ((m_iEXPCount - 1) * (m_iRTT + 4 * m_iRTTVar) / 2 + m_iSYNInterval) > 3000000))          || (m_iEXPCount * ((m_iEXPCount - 1) * (m_iRTT + 4 * m_iRTTVar) / 2 + m_iSYNInterval) > 30000000))      {         //         // Connection is broken.          // UDT does not signal any information about this instead of to stop quietly.         // Apllication will detect this when it calls any UDT methods next time.         //         m_bClosing = true;         m_bBroken = true;         // update snd U list to remove this socket         m_pSndQueue->m_pSndUList->update(m_SocketID, this, true);         releaseSynch();         CTimer::triggerEvent();         return;      }      // sender: Insert all the packets sent after last received acknowledgement into the sender loss list.      // recver: Send out a keep-alive packet      if (CSeqNo::incseq(m_iSndCurrSeqNo) != m_iSndLastAck)      {         int32_t csn = m_iSndCurrSeqNo;         m_pSndLossList->insert(const_cast<int32_t&>(m_iSndLastAck), csn);      }      else         sendCtrl(1);      if (m_pSndBuffer->getCurrBufSize() > 0)      {         // immediately restart transmission         m_pSndQueue->m_pSndUList->update(m_SocketID, this);      }      ++ m_iEXPCount;      m_ullEXPInt = (m_iEXPCount * (m_iRTT + 4 * m_iRTTVar) + m_iSYNInterval) * m_ullCPUFrequency;      if (m_ullEXPInt < 13 * m_ullSYNInt)         m_ullEXPInt = 13 * m_ullSYNInt;      CTimer::rdtsc(m_ullNextEXPTime);      m_ullNextEXPTime += m_ullEXPInt;      m_pCC->onTimeout();      // update CC parameters      m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);      m_dCongestionWindow = m_pCC->m_dCWndSize;   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -