⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 core.cpp

📁 可靠UDP传输, 长距离传输或者无线传输比TCP效率要高很多
💻 CPP
📖 第 1 页 / 共 4 页
字号:
            locktime.tv_sec = exptime / 1000000;            locktime.tv_nsec = (exptime % 1000000) * 1000;            if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime) == ETIMEDOUT)               timeout = true;            res = m_pRcvBuffer->readMsg(data, len);                    }         pthread_mutex_unlock(&m_RecvDataLock);      #else         if (m_iRcvTimeOut < 0)         {            while (!m_bBroken && m_bConnected && (0 == (res = m_pRcvBuffer->readMsg(data, len))))               WaitForSingleObject(m_RecvDataCond, INFINITE);         }         else         {            if (WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut)) == WAIT_TIMEOUT)               timeout = true;            res = m_pRcvBuffer->readMsg(data, len);         }      #endif      if (m_bBroken)         throw CUDTException(2, 1, 0);      else if (!m_bConnected)         throw CUDTException(2, 2, 0);   } while ((0 == res) && !timeout);   return res;}int64_t CUDT::sendfile(ifstream& ifs, const int64_t& offset, const int64_t& size, const int& block){   if (UDT_DGRAM == m_iSockType)      throw CUDTException(5, 10, 0);   CGuard sendguard(m_SendLock);   if (m_bBroken)      throw CUDTException(2, 1, 0);   else if (!m_bConnected)      throw CUDTException(2, 2, 0);   if (size <= 0)      return 0;   int64_t tosend = size;   int unitsize;   // positioning...   try   {      ifs.seekg((streamoff)offset);   }   catch (...)   {      throw CUDTException(4, 1);   }   // sending block by block   while (tosend > 0)   {      unitsize = int((tosend >= block) ? block : tosend);      #ifndef WIN32         pthread_mutex_lock(&m_SendBlockLock);         while (!m_bBroken && m_bConnected && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()))            pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);         pthread_mutex_unlock(&m_SendBlockLock);      #else         while (!m_bBroken && m_bConnected && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()))            WaitForSingleObject(m_SendBlockCond, INFINITE);      #endif      if (m_bBroken)         throw CUDTException(2, 1, 0);      else if (!m_bConnected)         throw CUDTException(2, 2, 0);      m_pSndBuffer->addBufferFromFile(ifs, unitsize);      // insert this socket to snd list if it is not on the list yet      m_pSndQueue->m_pSndUList->update(m_SocketID, this, false);      tosend -= unitsize;   }   return size;}int64_t CUDT::recvfile(ofstream& ofs, const int64_t& offset, const int64_t& size, const int& block){   if (UDT_DGRAM == m_iSockType)      throw CUDTException(5, 10, 0);   CGuard recvguard(m_RecvLock);   if (!m_bConnected)      throw CUDTException(2, 2, 0);   else if ((m_bBroken) && (0 == m_pRcvBuffer->getRcvDataSize()))      throw CUDTException(2, 1, 0);   if (size <= 0)      return 0;   int64_t torecv = size;   int unitsize = block;   int recvsize;   // positioning...   try   {      ofs.seekp((streamoff)offset);   }   catch (...)   {      throw CUDTException(4, 3);   }   // receiving... "recvfile" is always blocking   while (torecv > 0)   {      #ifndef WIN32         pthread_mutex_lock(&m_RecvDataLock);         while (!m_bBroken && m_bConnected && (0 == m_pRcvBuffer->getRcvDataSize()))            pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);         pthread_mutex_unlock(&m_RecvDataLock);      #else         while (!m_bBroken && m_bConnected && (0 == m_pRcvBuffer->getRcvDataSize()))            WaitForSingleObject(m_RecvDataCond, INFINITE);      #endif      if (!m_bConnected)         throw CUDTException(2, 2, 0);      else if (m_bBroken && (0 == m_pRcvBuffer->getRcvDataSize()))         throw CUDTException(2, 1, 0);      unitsize = int((torecv >= block) ? block : torecv);      recvsize = m_pRcvBuffer->readBufferToFile(ofs, unitsize);      torecv -= recvsize;   }   return size - torecv;}void CUDT::sample(CPerfMon* perf, bool clear){   if (!m_bConnected)      throw CUDTException(2, 2, 0);   if (m_bBroken)      throw CUDTException(2, 1, 0);   uint64_t currtime = CTimer::getTime();   perf->msTimeStamp = (currtime - m_StartTime) / 1000;   m_llSentTotal += m_llTraceSent;   m_llRecvTotal += m_llTraceRecv;   m_iSndLossTotal += m_iTraceSndLoss;   m_iRcvLossTotal += m_iTraceRcvLoss;   m_iRetransTotal += m_iTraceRetrans;   m_iSentACKTotal += m_iSentACK;   m_iRecvACKTotal += m_iRecvACK;   m_iSentNAKTotal += m_iSentNAK;   m_iRecvNAKTotal += m_iRecvNAK;   perf->pktSentTotal = m_llSentTotal;   perf->pktRecvTotal = m_llRecvTotal;   perf->pktSndLossTotal = m_iSndLossTotal;   perf->pktRcvLossTotal = m_iRcvLossTotal;   perf->pktRetransTotal = m_iRetransTotal;   perf->pktSentACKTotal = m_iSentACKTotal;   perf->pktRecvACKTotal = m_iRecvACKTotal;   perf->pktSentNAKTotal = m_iSentNAKTotal;   perf->pktRecvNAKTotal = m_iRecvNAKTotal;   perf->pktSent = m_llTraceSent;   perf->pktRecv = m_llTraceRecv;   perf->pktSndLoss = m_iTraceSndLoss;   perf->pktRcvLoss = m_iTraceRcvLoss;   perf->pktRetrans = m_iTraceRetrans;   perf->pktSentACK = m_iSentACK;   perf->pktRecvACK = m_iRecvACK;   perf->pktSentNAK = m_iSentNAK;   perf->pktRecvNAK = m_iRecvNAK;   double interval = double(currtime - m_LastSampleTime);   perf->mbpsSendRate = double(m_llTraceSent) * m_iPayloadSize * 8.0 / interval;   perf->mbpsRecvRate = double(m_llTraceRecv) * m_iPayloadSize * 8.0 / interval;   perf->usPktSndPeriod = m_ullInterval / double(m_ullCPUFrequency);   perf->pktFlowWindow = m_iFlowWindowSize;   perf->pktCongestionWindow = (int)m_dCongestionWindow;   perf->pktFlightSize = CSeqNo::seqlen(const_cast<int32_t&>(m_iSndLastAck), m_iSndCurrSeqNo);   perf->msRTT = m_iRTT/1000.0;   perf->mbpsBandwidth = m_iBandwidth * m_iPayloadSize * 8.0 / 1000000.0;   #ifndef WIN32      if (0 == pthread_mutex_trylock(&m_ConnectionLock))   #else      if (WAIT_OBJECT_0 == WaitForSingleObject(m_ConnectionLock, 0))   #endif   {      perf->byteAvailSndBuf = (NULL == m_pSndBuffer) ? 0 : m_iSndBufSize - m_pSndBuffer->getCurrBufSize();      perf->byteAvailRcvBuf = (NULL == m_pRcvBuffer) ? 0 : m_pRcvBuffer->getAvailBufSize();      #ifndef WIN32         pthread_mutex_unlock(&m_ConnectionLock);      #else         ReleaseMutex(m_ConnectionLock);      #endif   }   else   {      perf->byteAvailSndBuf = 0;      perf->byteAvailRcvBuf = 0;   }   if (clear)   {      m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceSndLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0;      m_LastSampleTime = currtime;   }}void CUDT::initSynch(){   #ifndef WIN32      pthread_mutex_init(&m_SendBlockLock, NULL);      pthread_cond_init(&m_SendBlockCond, NULL);      pthread_mutex_init(&m_RecvDataLock, NULL);      pthread_cond_init(&m_RecvDataCond, NULL);      pthread_mutex_init(&m_SendLock, NULL);      pthread_mutex_init(&m_RecvLock, NULL);      pthread_mutex_init(&m_AckLock, NULL);      pthread_mutex_init(&m_ConnectionLock, NULL);   #else      m_SendBlockLock = CreateMutex(NULL, false, NULL);      m_SendBlockCond = CreateEvent(NULL, false, false, NULL);      m_RecvDataLock = CreateMutex(NULL, false, NULL);      m_RecvDataCond = CreateEvent(NULL, false, false, NULL);      m_SendLock = CreateMutex(NULL, false, NULL);      m_RecvLock = CreateMutex(NULL, false, NULL);      m_AckLock = CreateMutex(NULL, false, NULL);      m_ConnectionLock = CreateMutex(NULL, false, NULL);   #endif}void CUDT::destroySynch(){   #ifndef WIN32      pthread_mutex_destroy(&m_SendBlockLock);      pthread_cond_destroy(&m_SendBlockCond);      pthread_mutex_destroy(&m_RecvDataLock);      pthread_cond_destroy(&m_RecvDataCond);      pthread_mutex_destroy(&m_SendLock);      pthread_mutex_destroy(&m_RecvLock);      pthread_mutex_destroy(&m_AckLock);      pthread_mutex_destroy(&m_ConnectionLock);   #else      CloseHandle(m_SendBlockLock);      CloseHandle(m_SendBlockCond);      CloseHandle(m_RecvDataLock);      CloseHandle(m_RecvDataCond);      CloseHandle(m_SendLock);      CloseHandle(m_RecvLock);      CloseHandle(m_AckLock);      CloseHandle(m_ConnectionLock);   #endif}void CUDT::releaseSynch(){   #ifndef WIN32      // wake up user calls      pthread_mutex_lock(&m_SendBlockLock);      pthread_cond_signal(&m_SendBlockCond);      pthread_mutex_unlock(&m_SendBlockLock);      pthread_mutex_lock(&m_SendLock);      pthread_mutex_unlock(&m_SendLock);      pthread_mutex_lock(&m_RecvDataLock);      pthread_cond_signal(&m_RecvDataCond);      pthread_mutex_unlock(&m_RecvDataLock);      pthread_mutex_lock(&m_RecvLock);      pthread_mutex_unlock(&m_RecvLock);   #else      SetEvent(m_SendBlockCond);      WaitForSingleObject(m_SendLock, INFINITE);      ReleaseMutex(m_SendLock);      SetEvent(m_RecvDataCond);      WaitForSingleObject(m_RecvLock, INFINITE);      ReleaseMutex(m_RecvLock);   #endif}void CUDT::sendCtrl(const int& pkttype, void* lparam, void* rparam, const int& size){   CPacket ctrlpkt;   switch (pkttype)   {   case 2: //010 - Acknowledgement      {      int32_t ack;      // If there is no loss, the ACK is the current largest sequence number plus 1;      // Otherwise it is the smallest sequence number in the receiver loss list.      if (0 == m_pRcvLossList->getLossLength())         ack = CSeqNo::incseq(m_iRcvCurrSeqNo);      else         ack = m_pRcvLossList->getFirstLostSeq();      if (ack == m_iRcvLastAckAck)         break;      // send out a lite ACK      // to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number      if (4 == size)      {         ctrlpkt.pack(2, NULL, &ack, size);         ctrlpkt.m_iID = m_PeerID;         m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);         break;      }      uint64_t currtime;      CTimer::rdtsc(currtime);      // There are new received packets to acknowledge, update related information.      if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0)      {         int acksize = CSeqNo::seqoff(m_iRcvLastAck, ack);         m_iRcvLastAck = ack;         m_pRcvBuffer->ackData(acksize);         // signal a waiting "recv" call if there is any data available         #ifndef WIN32            pthread_mutex_lock(&m_RecvDataLock);            if (m_bSynRecving)               pthread_cond_signal(&m_RecvDataCond);            pthread_mutex_unlock(&m_RecvDataLock);         #else            if (m_bSynRecving)               SetEvent(m_RecvDataCond);         #endif      }      else if (ack == m_iRcvLastAck)      {         if ((currtime - m_ullLastAckTime) < ((m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency))            break;      }      else         break;      // Send out the ACK only if has not been received by the sender before      if (CSeqNo::seqcmp(m_iRcvLastAck, m_iRcvLastAckAck) > 0)      {         int32_t data[6];         m_iAckSeqNo = CAckNo::incack(m_iAckSeqNo);         data[0] = m_iRcvLastAck;         data[1] = m_iRTT;         data[2] = m_iRTTVar;         data[3] = m_pRcvBuffer->getAvailBufSize();         // a minimum flow window of 2 is used, even if buffer is full, to break potential deadlock         if (data[3] < 2)            data[3] = 2;         if (CTimer::getTime() - m_ullLastAckTime > (uint64_t)m_iSYNInterval)         {            data[4] = m_pRcvTimeWindow->getPktRcvSpeed();            data[5] = m_pRcvTimeWindow->getBandwidth();            ctrlpkt.pack(2, &m_iAckSeqNo, data, 24);         }         else         {            ctrlpkt.pack(2, &m_iAckSeqNo, data, 16);         }         ctrlpkt.m_iID = m_PeerID;         m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);         m_pACKWindow->store(m_iAckSeqNo, m_iRcvLastAck);         CTimer::rdtsc(m_ullLastAckTime);         ++ m_iSentACK;      }      break;      }   case 6: //110 - Acknowledgement of Acknowledgement      ctrlpkt.pack(6, lparam);      ctrlpkt.m_iID = m_PeerID;      m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);      break;   case 3: //011 - Loss Report      if (NULL != rparam)      {         if (1 == size)         {            // only 1 loss packet            ctrlpkt.pack(3, NULL, (int32_t *)rparam + 1, 4);         }         else         {            // more than 1 loss packets            ctrlpkt.pack(3, NULL, rparam, 8);         }         ctrlpkt.m_iID = m_PeerID;         m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);         ++ m_iSentNAK;      }      else if (m_pRcvLossList->getLossLength() > 0)      {         // this is periodically NAK report         // read loss list from the local receiver loss list         int32_t* data = new int32_t[m_iPayloadSize / 4];         int losslen;         m_pRcvLossList->getLossArray(data, losslen, m_iPayloadSize / 4, m_iRTT + 4 * m_iRTTVar);         if (0 < losslen)         {            ctrlpkt.pack(3, NULL, data, losslen * 4);            ctrlpkt.m_iID = m_PeerID;            m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);            ++ m_iSentNAK;         }         delete [] data;      }      break;   case 4: //100 - Congestion Warning      ctrlpkt.pack(4);      ctrlpkt.m_iID = m_PeerID;      m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);      CTimer::rdtsc(m_ullLastWarningTime);      break;   case 1: //001 - Keep-alive      ctrlpkt.pack(1);      ctrlpkt.m_iID = m_PeerID;      m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);       break;   case 0: //000 - Handshake      ctrlpkt.pack(0, NULL, rparam, sizeof(CHandShake));      ctrlpkt.m_iID = m_PeerID;      m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);      break;   case 5: //101 - Shutdown      ctrlpkt.pack(5);      ctrlpkt.m_iID = m_PeerID;      m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);      break;   case 7: //111 - Msg drop request      ctrlpkt.pack(7, lparam, rparam, 8);      ctrlpkt.m_iID = m_PeerID;      m_pSndQueue->sendto(m_pPeerAddr, ctrlpkt);      break;   case 65535: //0x7FFF - Resevered for future use      break;   default:      break;   }}void CUDT::processCtrl(CPacket& ctrlpkt){   // Just heard from the peer, reset the expiration count.   m_iEXPCount = 1;   if ((CSeqNo::incseq(m_iSndCurrSeqNo) == m_iSndLastAck) || (2 == ctrlpkt.getType()) || (3 == ctrlpkt.getType()))   {      m_ullEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt;      if (m_ullEXPInt < 13 * m_ullSYNInt)         m_ullEXPInt = 13 * m_ullSYNInt;      CTimer::rdtsc(m_ullNextEXPTime);      m_ullNextEXPTime += m_ullEXPInt;   }   switch (ctrlpkt.getType())   {   case 2: //010 - Acknowledgement      {      int32_t ack;      // process a lite ACK      if (4 == ctrlpkt.getLength())      {         ack = *(int32_t *)ctrlpkt.m_pcData;         if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0)         {            m_iFlowWindowSize -= CSeqNo::seqoff(const_cast<int32_t&>(m_iSndLastAck), ack);            m_iSndLastAck = ack;         }         break;      }      // read ACK seq. no.      ack = ctrlpkt.getAckSeqNo();      // send ACK acknowledgement      sendCtrl(6, &ack);      // Got data ACK      ack = *(int32_t *)ctrlpkt.m_pcData;      if (CSeqNo::seqcmp(ack, const_cast<int32_t&>(m_iSndLastAck)) >= 0)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -