📄 queue.cpp
字号:
n->m_pPrev = m_pLast; n->m_pNext = NULL; m_pLast->m_pNext = n; m_pLast = n;}void CRcvUList::remove(const int32_t& id){ if (NULL == m_pUList) return; if (id == m_pUList->m_iID) { CUDTList* n = m_pUList; // remove first node m_pUList = m_pUList->m_pNext; if (NULL == m_pUList) m_pLast = NULL; else m_pUList->m_pPrev = NULL; n->m_bOnList = false; return; } // check further CUDTList* p = m_pUList; while (NULL != p->m_pNext) { if (id == p->m_pNext->m_iID) { CUDTList* n = p->m_pNext; p->m_pNext = p->m_pNext->m_pNext; if (NULL != p->m_pNext) p->m_pNext->m_pPrev = p; else m_pLast = p; n->m_bOnList = false; return; } p = p->m_pNext; }}void CRcvUList::update(const int32_t& id){ if (NULL == m_pUList) return; if (id == m_pUList->m_iID) { CTimer::rdtsc(m_pUList->m_llTimeStamp); // if there is only one node in the list, simply update it, otherwise move it to the end if (NULL != m_pUList->m_pNext) { CUDTList* n = m_pUList; m_pUList = m_pUList->m_pNext; m_pUList->m_pPrev = NULL; n->m_pNext = NULL; n->m_pPrev = m_pLast; m_pLast->m_pNext = n; m_pLast = n; } return; } // check further CUDTList* p = m_pUList; while (NULL != p->m_pNext) { if (id == p->m_pNext->m_iID) { CTimer::rdtsc(p->m_pNext->m_llTimeStamp); if (NULL != p->m_pNext->m_pNext) { CUDTList* n = p->m_pNext; p->m_pNext = p->m_pNext->m_pNext; p->m_pNext->m_pPrev = p; n->m_pNext = NULL; n->m_pPrev = m_pLast; m_pLast->m_pNext = n; m_pLast = n; } return; } p = p->m_pNext; }}//CHash::CHash():m_pBucket(NULL),m_iHashSize(0){}CHash::~CHash(){ for (int i = 0; i < m_iHashSize; ++ i) { CBucket* b = m_pBucket[i]; while (NULL != b) { CBucket* n = b->m_pNext; delete b; b = n; } } delete [] m_pBucket;}void CHash::init(const int& size){ m_pBucket = new CBucket* [size]; for (int i = 0; i < size; ++ i) m_pBucket[i] = NULL; m_iHashSize = size;}CUDT* CHash::lookup(const int32_t& id){ // simple hash function (% hash table size); suitable for socket descriptors CBucket* b = m_pBucket[id % m_iHashSize]; while (NULL != b) { if (id == b->m_iID) return b->m_pUDT; b = b->m_pNext; } return NULL;}void CHash::insert(const int32_t& id, const CUDT* u){ CBucket* b = m_pBucket[id % m_iHashSize]; CBucket* n = new CBucket; n->m_iID = id; n->m_pUDT = (CUDT*)u; n->m_pNext = b; m_pBucket[id % m_iHashSize] = n;}void CHash::remove(const int32_t& id){ CBucket* b = m_pBucket[id % m_iHashSize]; CBucket* p = NULL; while (NULL != b) { if (id == b->m_iID) { if (NULL == p) m_pBucket[id % m_iHashSize] = b->m_pNext; else p->m_pNext = b->m_pNext; delete b; return; } p = b; b = b->m_pNext; }}//CRendezvousQueue::CRendezvousQueue(){ #ifndef WIN32 pthread_mutex_init(&m_RIDVectorLock, NULL); #else m_RIDVectorLock = CreateMutex(NULL, false, NULL); #endif m_vRendezvousID.clear();}CRendezvousQueue::~CRendezvousQueue(){ #ifndef WIN32 pthread_mutex_destroy(&m_RIDVectorLock); #else CloseHandle(m_RIDVectorLock); #endif for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i) { if (AF_INET == i->m_iIPversion) delete (sockaddr_in*)i->m_pPeerAddr; else delete (sockaddr_in6*)i->m_pPeerAddr; } m_vRendezvousID.clear();}void CRendezvousQueue::insert(const UDTSOCKET& id, const int& ipv, const sockaddr* addr, CUDT* u){ CGuard vg(m_RIDVectorLock); CRL r; r.m_iID = id; r.m_iPeerID = 0; r.m_iIPversion = ipv; r.m_pPeerAddr = (AF_INET == ipv) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6; memcpy(r.m_pPeerAddr, addr, (AF_INET == ipv) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6)); r.m_pUDT = u; m_vRendezvousID.insert(m_vRendezvousID.end(), r);}void CRendezvousQueue::remove(const UDTSOCKET& id){ CGuard vg(m_RIDVectorLock); for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i) if (i->m_iID == id) { if (AF_INET == i->m_iIPversion) delete (sockaddr_in*)i->m_pPeerAddr; else delete (sockaddr_in6*)i->m_pPeerAddr; m_vRendezvousID.erase(i); return; }}bool CRendezvousQueue::retrieve(const sockaddr* addr, UDTSOCKET& id, const UDTSOCKET& peerid, CUDT*& u){ CGuard vg(m_RIDVectorLock); for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i) if (CIPAddress::ipcmp(addr, i->m_pPeerAddr, i->m_iIPversion) && ((0 == i->m_iPeerID) || (peerid == i->m_iPeerID))) { id = i->m_iID; i->m_iPeerID = peerid; u = i->m_pUDT; return true; } return false;}//CRcvQueue::CRcvQueue():m_pRcvUList(NULL),m_pHash(NULL),m_pChannel(NULL),m_pTimer(NULL),m_bClosing(false),m_pListener(NULL),m_pRendezvousQueue(NULL){ #ifndef WIN32 pthread_mutex_init(&m_PassLock, NULL); pthread_cond_init(&m_PassCond, NULL); pthread_mutex_init(&m_LSLock, NULL); pthread_mutex_init(&m_IDLock, NULL); #else m_PassLock = CreateMutex(NULL, false, NULL); m_PassCond = CreateEvent(NULL, false, false, NULL); m_LSLock = CreateMutex(NULL, false, NULL); m_IDLock = CreateMutex(NULL, false, NULL); #endif m_vNewEntry.clear(); m_mBuffer.clear();}CRcvQueue::~CRcvQueue(){ m_bClosing = true; #ifndef WIN32 if (0 != m_WorkerThread) pthread_join(m_WorkerThread, NULL); pthread_mutex_destroy(&m_PassLock); pthread_cond_destroy(&m_PassCond); pthread_mutex_destroy(&m_LSLock); pthread_mutex_destroy(&m_IDLock); #else if (NULL != m_WorkerThread) WaitForSingleObject(m_WorkerThread, INFINITE); CloseHandle(m_WorkerThread); CloseHandle(m_PassLock); CloseHandle(m_PassCond); CloseHandle(m_LSLock); CloseHandle(m_IDLock); #endif delete m_pRcvUList; delete m_pHash; delete m_pRendezvousQueue; for (map<int32_t, CPacket*>::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++ i) { delete [] i->second->m_pcData; delete i->second; }}void CRcvQueue::init(const int& qsize, const int& payload, const int& version, const int& hsize, const CChannel* cc, const CTimer* t){ m_iPayloadSize = payload; m_UnitQueue.init(qsize, payload, version); m_pHash = new CHash; m_pHash->init(hsize); m_pChannel = (CChannel*)cc; m_pTimer = (CTimer*)t; m_pRcvUList = new CRcvUList; m_pRendezvousQueue = new CRendezvousQueue; #ifndef WIN32 if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this)) { m_WorkerThread = 0; throw CUDTException(3, 1); } #else DWORD threadID; m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &threadID); if (NULL == m_WorkerThread) throw CUDTException(3, 1); #endif}#ifndef WIN32 void* CRcvQueue::worker(void* param)#else DWORD WINAPI CRcvQueue::worker(LPVOID param)#endif{ CRcvQueue* self = (CRcvQueue*)param; CUnit temp; temp.m_Packet.m_pcData = new char[self->m_iPayloadSize]; sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6; while (!self->m_bClosing) { #ifdef NO_BUSY_WAITING self->m_pTimer->tick(); #endif // check waiting list, if new socket, insert it to the list if (self->ifNewEntry()) { CUDT* ne = self->getNewEntry(); if (NULL != ne) { self->m_pRcvUList->insert(ne); self->m_pHash->insert(ne->m_SocketID, ne); } } // find next available slot for incoming packet CUnit* unit = self->m_UnitQueue.getNextAvailUnit(); if (NULL == unit) unit = &temp; unit->m_Packet.setLength(self->m_iPayloadSize); CUDT* u = NULL; int32_t id; // reading next incoming packet if (self->m_pChannel->recvfrom(addr, unit->m_Packet) <= 0) goto TIMER_CHECK; if (unit == &temp) goto TIMER_CHECK; id = unit->m_Packet.m_iID; // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets if (0 == id) { if (NULL != self->m_pListener) ((CUDT*)self->m_pListener)->listen(addr, unit->m_Packet); else if (self->m_pRendezvousQueue->retrieve(addr, id, ((CHandShake*)unit->m_Packet.m_pcData)->m_iID, u)) { if (u->m_bConnected && !u->m_bBroken) u->processCtrl(unit->m_Packet); else self->storePkt(id, unit->m_Packet.clone()); } } else if (id > 0) { if (NULL != (u = self->m_pHash->lookup(id))) { if (u->m_bConnected && !u->m_bBroken) { if (0 == unit->m_Packet.getFlag()) u->processData(unit); else u->processCtrl(unit->m_Packet); u->checkTimers(); self->m_pRcvUList->update(id); } } else self->storePkt(id, unit->m_Packet.clone()); }TIMER_CHECK: // take care of the timing event for all UDT sockets CUDTList* ul = self->m_pRcvUList->m_pUList; uint64_t currtime; CTimer::rdtsc(currtime); while ((NULL != ul) && (ul->m_llTimeStamp < currtime - 10000 * CTimer::getCPUFrequency())) { CUDT* u = ul->m_pUDT; int32_t id = ul->m_iID; u->checkTimers(); if (u->m_bConnected && !u->m_bBroken) self->m_pRcvUList->update(id); else { // the socket must be removed from Hash table first, then RcvUList self->m_pHash->remove(id); self->m_pRcvUList->remove(id); } ul = self->m_pRcvUList->m_pUList; } } delete [] temp.m_Packet.m_pcData; if (AF_INET == self->m_UnitQueue.m_iIPversion) delete (sockaddr_in*)addr; else delete (sockaddr_in6*)addr; return NULL;}int CRcvQueue::recvfrom(const int32_t& id, CPacket& packet){ CGuard bufferlock(m_PassLock); map<int32_t, CPacket*>::iterator i = m_mBuffer.find(id); if (i == m_mBuffer.end()) { #ifndef WIN32 uint64_t now = CTimer::getTime(); timespec timeout; timeout.tv_sec = now / 1000000 + 1; timeout.tv_nsec = (now % 1000000) * 1000; pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout); #else ReleaseMutex(m_PassLock); WaitForSingleObject(m_PassCond, 1); WaitForSingleObject(m_PassLock, INFINITE); #endif i = m_mBuffer.find(id); if (i == m_mBuffer.end()) { packet.setLength(-1); return -1; } } if (packet.getLength() < i->second->getLength()) { packet.setLength(-1); return -1; } memcpy(packet.m_nHeader, i->second->m_nHeader, CPacket::m_iPktHdrSize); memcpy(packet.m_pcData, i->second->m_pcData, i->second->getLength()); packet.setLength(i->second->getLength()); delete [] i->second->m_pcData; delete i->second; m_mBuffer.erase(i); return packet.getLength();}int CRcvQueue::setListener(const CUDT* u){ CGuard lslock(m_LSLock); if (NULL != m_pListener) return -1; m_pListener = (CUDT*)u; return 1;}void CRcvQueue::removeListener(const CUDT* u){ CGuard lslock(m_LSLock); if (u == m_pListener) m_pListener = NULL;}void CRcvQueue::setNewEntry(CUDT* u){ CGuard listguard(m_IDLock); m_vNewEntry.insert(m_vNewEntry.end(), u);}bool CRcvQueue::ifNewEntry(){ return !(m_vNewEntry.empty());}CUDT* CRcvQueue::getNewEntry(){ CGuard listguard(m_IDLock); if (m_vNewEntry.empty()) return NULL; CUDT* u = (CUDT*)*(m_vNewEntry.begin()); m_vNewEntry.erase(m_vNewEntry.begin()); return u;}void CRcvQueue::storePkt(const int32_t& id, CPacket* pkt){ #ifndef WIN32 pthread_mutex_lock(&m_PassLock); #else WaitForSingleObject(m_PassLock, INFINITE); #endif map<int32_t, CPacket*>::iterator i = m_mBuffer.find(id); if (i == m_mBuffer.end()) m_mBuffer[id] = pkt; else { delete [] i->second->m_pcData; delete i->second; i->second = pkt; } #ifndef WIN32 pthread_mutex_unlock(&m_PassLock); pthread_cond_signal(&m_PassCond); #else ReleaseMutex(m_PassLock); SetEvent(m_PassCond); #endif}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -