📄 core.cpp
字号:
if (NULL == m_pRNode) m_pRNode = new CRNode; m_pRNode->m_pUDT = this; m_pRNode->m_llTimeStamp = 1; m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL; m_pRNode->m_bOnList = false; m_iRTT = 10 * m_iSYNInterval; m_iRTTVar = m_iRTT >> 1; m_ullCPUFrequency = CTimer::getCPUFrequency(); // set up the timers m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency; m_ullACKInt = m_ullSYNInt; m_ullNAKInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency; m_ullEXPInt = (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency + m_ullSYNInt; CTimer::rdtsc(m_ullNextACKTime); m_ullNextACKTime += m_ullSYNInt; CTimer::rdtsc(m_ullNextNAKTime); m_ullNextNAKTime += m_ullNAKInt; CTimer::rdtsc(m_ullNextEXPTime); m_ullNextEXPTime += m_ullEXPInt; m_iPktCount = 0; m_iLightACKCount = 1; m_ullTargetTime = 0; m_ullTimeDiff = 0; // Now UDT is opened. m_bOpened = true;}void CUDT::listen(){ CGuard cg(m_ConnectionLock); if (!m_bOpened) throw CUDTException(5, 0, 0); if (m_bConnected) throw CUDTException(5, 2, 0); // listen can be called more than once if (m_bListening) return; // if there is already another socket listening on the same port if (m_pRcvQueue->setListener(this) < 0) throw CUDTException(5, 11, 0); m_bListening = true;}void CUDT::connect(const sockaddr* serv_addr){ CGuard cg(m_ConnectionLock); if (!m_bOpened) throw CUDTException(5, 0, 0); if (m_bListening) throw CUDTException(5, 2, 0); if (m_bConnected) throw CUDTException(5, 2, 0); // register this socket in the rendezvous queue m_pRcvQueue->m_pRendezvousQueue->insert(m_SocketID, m_iIPversion, serv_addr); CPacket request; char* reqdata = new char [m_iPayloadSize]; CHandShake* req = (CHandShake *)reqdata; CPacket response; char* resdata = new char [m_iPayloadSize]; CHandShake* res = (CHandShake *)resdata; // This is my current configurations. req->m_iVersion = m_iVersion; req->m_iType = m_iSockType; req->m_iMSS = m_iMSS; req->m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize; req->m_iReqType = (!m_bRendezvous) ? 1 : 0; req->m_iID = m_SocketID; // Random Initial Sequence Number srand((unsigned int)CTimer::getTime()); m_iISN = req->m_iISN = (int32_t)(double(rand()) * CSeqNo::m_iMaxSeqNo / (RAND_MAX + 1.0)); m_iLastDecSeq = req->m_iISN - 1; m_iSndLastAck = req->m_iISN; m_iSndLastDataAck = req->m_iISN; m_iSndCurrSeqNo = req->m_iISN - 1; m_iSndLastAck2 = req->m_iISN; m_ullSndLastAck2Time = CTimer::getTime(); // Inform the server my configurations. request.pack(0, NULL, reqdata, sizeof(CHandShake)); // ID = 0, connection request request.m_iID = 0; // Wait for the negotiated configurations from the peer side. response.pack(0, NULL, resdata, sizeof(CHandShake)); uint64_t timeo = 3000000; if (m_bRendezvous) timeo *= 10; uint64_t entertime = CTimer::getTime(); CUDTException e(0, 0); char* tmp = NULL; while (!m_bClosing) { m_pSndQueue->sendto(serv_addr, request); response.setLength(m_iPayloadSize); if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0) { if (m_bRendezvous && (0 == response.getFlag()) && (NULL != tmp)) { // a data packet comes, which means the peer side is already connected // in this situation, a previously recorded response (tmp) will be used memcpy(resdata, tmp, sizeof(CHandShake)); delete [] tmp; break; } if ((1 != response.getFlag()) || (0 != response.getType())) response.setLength(-1); if (m_bRendezvous) { // regular connect should NOT communicate with rendezvous connect // rendezvous connect require 3-way handshake if (1 == res->m_iReqType) response.setLength(-1); else if ((0 == res->m_iReqType) || (0 == req->m_iReqType)) { tmp = new char [m_iPayloadSize]; memcpy(tmp, resdata, sizeof(CHandShake)); req->m_iReqType = -1; request.m_iID = res->m_iID; response.setLength(-1); } } else { // set cookie if (1 == res->m_iReqType) { req->m_iReqType = -1; req->m_iCookie = res->m_iCookie; response.setLength(-1); } } } if (response.getLength() > 0) break; if (CTimer::getTime() - entertime > timeo) { // timeout e = CUDTException(1, 1, 0); break; } } delete [] reqdata; if (e.getErrorCode() == 0) { if (m_bClosing) // if the socket is closed before connection... e = CUDTException(1); else if (1002 == res->m_iReqType) // connection request rejected e = CUDTException(1, 2, 0); else if ((!m_bRendezvous) && (m_iISN != res->m_iISN)) // secuity check e = CUDTException(1, 4, 0); } if (e.getErrorCode() != 0) { // connection failure, clean up and throw exception delete [] resdata; if (m_bRendezvous) m_pRcvQueue->m_pRendezvousQueue->remove(m_SocketID); throw e; } // Got it. Re-configure according to the negotiated values. m_iMSS = res->m_iMSS; m_iFlowWindowSize = res->m_iFlightFlagSize; m_iPktSize = m_iMSS - 28; m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize; m_iPeerISN = res->m_iISN; m_iRcvLastAck = res->m_iISN; m_iRcvLastAckAck = res->m_iISN; m_iRcvCurrSeqNo = res->m_iISN - 1; m_PeerID = res->m_iID; delete [] resdata; // Prepare all data structures try { m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize); m_pRcvBuffer = new CRcvBuffer(m_iRcvBufSize, &(m_pRcvQueue->m_UnitQueue)); // after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space. m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2); m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize); m_pACKWindow = new CACKWindow(4096); m_pRcvTimeWindow = new CPktTimeWindow(16, 64); m_pSndTimeWindow = new CPktTimeWindow(); } catch (...) { throw CUDTException(3, 2, 0); } m_pCC = m_pCCFactory->create(); m_pCC->m_UDT = m_SocketID; m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); m_dCongestionWindow = m_pCC->m_dCWndSize; m_pController->join(this, serv_addr, m_iIPversion, m_iRTT, m_iBandwidth); m_pCC->setMSS(m_iMSS); m_pCC->setMaxCWndSize((int&)m_iFlowWindowSize); m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo); m_pCC->setRcvRate(m_iDeliveryRate); m_pCC->setRTT(m_iRTT); m_pCC->setBandwidth(m_iBandwidth); m_pCC->setUserParam((char*)&(m_llMaxBW), 8); m_pCC->init(); m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6; memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6)); // And, I am connected too. m_bConnected = true; // register this socket for receiving data packets m_pRcvQueue->setNewEntry(this); // remove from rendezvous queue m_pRcvQueue->m_pRendezvousQueue->remove(m_SocketID);}void CUDT::connect(const sockaddr* peer, CHandShake* hs){ // Type 0 (handshake) control packet CPacket initpkt; CHandShake ci; memcpy(&ci, hs, sizeof(CHandShake)); initpkt.pack(0, NULL, &ci, sizeof(CHandShake)); // Uses the smaller MSS between the peers if (ci.m_iMSS > m_iMSS) ci.m_iMSS = m_iMSS; else m_iMSS = ci.m_iMSS; // exchange info for maximum flow window size m_iFlowWindowSize = ci.m_iFlightFlagSize; ci.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize; m_iPeerISN = ci.m_iISN; m_iRcvLastAck = ci.m_iISN; m_iRcvLastAckAck = ci.m_iISN; m_iRcvCurrSeqNo = ci.m_iISN - 1; m_PeerID = ci.m_iID; ci.m_iID = m_SocketID; // use peer's ISN and send it back for security check m_iISN = ci.m_iISN; m_iLastDecSeq = m_iISN - 1; m_iSndLastAck = m_iISN; m_iSndLastDataAck = m_iISN; m_iSndCurrSeqNo = m_iISN - 1; m_iSndLastAck2 = m_iISN; m_ullSndLastAck2Time = CTimer::getTime(); // this is a reponse handshake ci.m_iReqType = -1; // Save the negotiated configurations. memcpy(hs, &ci, sizeof(CHandShake)); m_iPktSize = m_iMSS - 28; m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize; // Prepare all structures try { m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize); m_pRcvBuffer = new CRcvBuffer(m_iRcvBufSize, &(m_pRcvQueue->m_UnitQueue)); m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2); m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize); m_pACKWindow = new CACKWindow(4096); m_pRcvTimeWindow = new CPktTimeWindow(16, 64); m_pSndTimeWindow = new CPktTimeWindow(); } catch (...) { throw CUDTException(3, 2, 0); } m_pCC = m_pCCFactory->create(); m_pCC->m_UDT = m_SocketID; m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency); m_dCongestionWindow = m_pCC->m_dCWndSize; m_pController->join(this, peer, m_iIPversion, m_iRTT, m_iBandwidth); m_pCC->setMSS(m_iMSS); m_pCC->setMaxCWndSize((int&)m_iFlowWindowSize); m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo); m_pCC->setRcvRate(m_iDeliveryRate); m_pCC->setRTT(m_iRTT); m_pCC->setBandwidth(m_iBandwidth); m_pCC->init(); m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6; memcpy(m_pPeerAddr, peer, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6)); // And of course, it is connected. m_bConnected = true; // register this socket for receiving data packets m_pRcvQueue->setNewEntry(this);}void CUDT::close(){ if (!m_bOpened) return; if (!m_bConnected) m_bClosing = true; if (0 != m_Linger.l_onoff) { uint64_t entertime = CTimer::getTime(); while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL)) { #ifndef WIN32 timespec ts; ts.tv_sec = 0; ts.tv_nsec = 1000000; nanosleep(&ts, NULL); #else Sleep(1); #endif } } // remove this socket from the snd queue if (m_bConnected) m_pSndQueue->m_pSndUList->remove(this); CGuard cg(m_ConnectionLock); // Inform the threads handler to stop. m_bClosing = true; // Signal the sender and recver if they are waiting for data. releaseSynch(); if (m_bListening) { m_bListening = false; m_pRcvQueue->removeListener(this); } if (m_bConnected) { if (!m_bShutdown) sendCtrl(5); m_pCC->close(); m_pController->leave(this, m_iRTT, m_iBandwidth); m_bConnected = false; } // waiting all send and recv calls to stop CGuard sendguard(m_SendLock); CGuard recvguard(m_RecvLock); // CLOSED. m_bOpened = false;}int CUDT::send(const char* data, const int& len){ if (UDT_DGRAM == m_iSockType) throw CUDTException(5, 10, 0); CGuard sendguard(m_SendLock); // throw an exception if not connected if (m_bBroken || m_bClosing) throw CUDTException(2, 1, 0); else if (!m_bConnected) throw CUDTException(2, 2, 0); if (len <= 0) return 0; if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) { if (!m_bSynSending) throw CUDTException(6, 1, 0); else { // wait here during a blocking sending #ifndef WIN32 pthread_mutex_lock(&m_SendBlockLock); if (m_iSndTimeOut < 0) { while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())) pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock); } else { uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL; timespec locktime; locktime.tv_sec = exptime / 1000000; locktime.tv_nsec = (exptime % 1000000) * 1000; pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime); } pthread_mutex_unlock(&m_SendBlockLock); #else if (m_iSndTimeOut < 0) { while (!m_bBroken && m_bConnected && !m_bClosing && (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())) WaitForSingleObject(m_SendBlockCond, INFINITE); } else
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -