📄 queue.cpp
字号:
n->m_pPrev = n->m_pNext = NULL; m_pLast = m_pUList = n; return; } // always insert at the end for RcvUList n->m_pPrev = m_pLast; n->m_pNext = NULL; m_pLast->m_pNext = n; m_pLast = n;}void CRcvUList::remove(const CUDT* u){ CRNode* n = u->m_pRNode; if (!n->m_bOnList) return; if (NULL == n->m_pPrev) { // n is the first node m_pUList = n->m_pNext; if (NULL == m_pUList) m_pLast = NULL; else m_pUList->m_pPrev = NULL; } else { n->m_pPrev->m_pNext = n->m_pNext; if (NULL == n->m_pNext) { // n is the last node m_pLast = n->m_pPrev; } else n->m_pNext->m_pPrev = n->m_pPrev; } n->m_pNext = n->m_pPrev = NULL; n->m_bOnList = false;}void CRcvUList::update(const CUDT* u){ CRNode* n = u->m_pRNode; if (!n->m_bOnList) return; CTimer::rdtsc(n->m_llTimeStamp); // if n is the last node, do not need to change if (NULL == n->m_pNext) return; if (NULL == n->m_pPrev) { m_pUList = n->m_pNext; m_pUList->m_pPrev = NULL; } else { n->m_pPrev->m_pNext = n->m_pNext; n->m_pNext->m_pPrev = n->m_pPrev; } n->m_pPrev = m_pLast; n->m_pNext = NULL; m_pLast->m_pNext = n; m_pLast = n;}//CHash::CHash():m_pBucket(NULL),m_iHashSize(0){}CHash::~CHash(){ for (int i = 0; i < m_iHashSize; ++ i) { CBucket* b = m_pBucket[i]; while (NULL != b) { CBucket* n = b->m_pNext; delete b; b = n; } } delete [] m_pBucket;}void CHash::init(const int& size){ m_pBucket = new CBucket* [size]; for (int i = 0; i < size; ++ i) m_pBucket[i] = NULL; m_iHashSize = size;}CUDT* CHash::lookup(const int32_t& id){ // simple hash function (% hash table size); suitable for socket descriptors CBucket* b = m_pBucket[id % m_iHashSize]; while (NULL != b) { if (id == b->m_iID) return b->m_pUDT; b = b->m_pNext; } return NULL;}void CHash::insert(const int32_t& id, const CUDT* u){ CBucket* b = m_pBucket[id % m_iHashSize]; CBucket* n = new CBucket; n->m_iID = id; n->m_pUDT = (CUDT*)u; n->m_pNext = b; m_pBucket[id % m_iHashSize] = n;}void CHash::remove(const int32_t& id){ CBucket* b = m_pBucket[id % m_iHashSize]; CBucket* p = NULL; while (NULL != b) { if (id == b->m_iID) { if (NULL == p) m_pBucket[id % m_iHashSize] = b->m_pNext; else p->m_pNext = b->m_pNext; delete b; return; } p = b; b = b->m_pNext; }}//CRendezvousQueue::CRendezvousQueue(){ #ifndef WIN32 pthread_mutex_init(&m_RIDVectorLock, NULL); #else m_RIDVectorLock = CreateMutex(NULL, false, NULL); #endif m_vRendezvousID.clear();}CRendezvousQueue::~CRendezvousQueue(){ #ifndef WIN32 pthread_mutex_destroy(&m_RIDVectorLock); #else CloseHandle(m_RIDVectorLock); #endif for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i) { if (AF_INET == i->m_iIPversion) delete (sockaddr_in*)i->m_pPeerAddr; else delete (sockaddr_in6*)i->m_pPeerAddr; } m_vRendezvousID.clear();}void CRendezvousQueue::insert(const UDTSOCKET& id, const int& ipv, const sockaddr* addr){ CGuard vg(m_RIDVectorLock); CRL r; r.m_iID = id; r.m_iIPversion = ipv; r.m_pPeerAddr = (AF_INET == ipv) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6; memcpy(r.m_pPeerAddr, addr, (AF_INET == ipv) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6)); m_vRendezvousID.insert(m_vRendezvousID.end(), r);}void CRendezvousQueue::remove(const UDTSOCKET& id){ CGuard vg(m_RIDVectorLock); for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i) if (i->m_iID == id) { if (AF_INET == i->m_iIPversion) delete (sockaddr_in*)i->m_pPeerAddr; else delete (sockaddr_in6*)i->m_pPeerAddr; m_vRendezvousID.erase(i); return; }}bool CRendezvousQueue::retrieve(const sockaddr* addr, UDTSOCKET& id){ CGuard vg(m_RIDVectorLock); for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i) { if (CIPAddress::ipcmp(addr, i->m_pPeerAddr, i->m_iIPversion) && ((0 == id) || (id == i->m_iID))) { id = i->m_iID; return true; } } return false;}//CRcvQueue::CRcvQueue():m_pRcvUList(NULL),m_pHash(NULL),m_pChannel(NULL),m_pTimer(NULL),m_bClosing(false),m_pListener(NULL),m_pRendezvousQueue(NULL){ #ifndef WIN32 pthread_mutex_init(&m_PassLock, NULL); pthread_cond_init(&m_PassCond, NULL); pthread_mutex_init(&m_LSLock, NULL); pthread_mutex_init(&m_IDLock, NULL); #else m_PassLock = CreateMutex(NULL, false, NULL); m_PassCond = CreateEvent(NULL, false, false, NULL); m_LSLock = CreateMutex(NULL, false, NULL); m_IDLock = CreateMutex(NULL, false, NULL); m_ExitCond = CreateEvent(NULL, false, false, NULL); #endif m_vNewEntry.clear(); m_mBuffer.clear();}CRcvQueue::~CRcvQueue(){ m_bClosing = true; #ifndef WIN32 if (0 != m_WorkerThread) pthread_join(m_WorkerThread, NULL); pthread_mutex_destroy(&m_PassLock); pthread_cond_destroy(&m_PassCond); pthread_mutex_destroy(&m_LSLock); pthread_mutex_destroy(&m_IDLock); #else if (NULL != m_WorkerThread) WaitForSingleObject(m_ExitCond, INFINITE); CloseHandle(m_WorkerThread); CloseHandle(m_PassLock); CloseHandle(m_PassCond); CloseHandle(m_LSLock); CloseHandle(m_IDLock); #endif delete m_pRcvUList; delete m_pHash; delete m_pRendezvousQueue; for (map<int32_t, CPacket*>::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++ i) { delete [] i->second->m_pcData; delete i->second; }}void CRcvQueue::init(const int& qsize, const int& payload, const int& version, const int& hsize, const CChannel* cc, const CTimer* t){ m_iPayloadSize = payload; m_UnitQueue.init(qsize, payload, version); m_pHash = new CHash; m_pHash->init(hsize); m_pChannel = (CChannel*)cc; m_pTimer = (CTimer*)t; m_pRcvUList = new CRcvUList; m_pRendezvousQueue = new CRendezvousQueue; #ifndef WIN32 if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this)) { m_WorkerThread = 0; throw CUDTException(3, 1); } #else DWORD threadID; m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &threadID); if (NULL == m_WorkerThread) throw CUDTException(3, 1); #endif}#ifndef WIN32 void* CRcvQueue::worker(void* param)#else DWORD WINAPI CRcvQueue::worker(LPVOID param)#endif{ CRcvQueue* self = (CRcvQueue*)param; sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6; CUDT* u = NULL; int32_t id; while (!self->m_bClosing) { #ifdef NO_BUSY_WAITING self->m_pTimer->tick(); #endif // check waiting list, if new socket, insert it to the list if (self->ifNewEntry()) { CUDT* ne = self->getNewEntry(); if (NULL != ne) { self->m_pRcvUList->insert(ne); self->m_pHash->insert(ne->m_SocketID, ne); } } // find next available slot for incoming packet CUnit* unit = self->m_UnitQueue.getNextAvailUnit(); if (NULL == unit) { // no space, skip this packet CUnit temp; temp.m_Packet.m_pcData = new char[self->m_iPayloadSize]; self->m_pChannel->recvfrom(addr, temp.m_Packet); delete [] temp.m_Packet.m_pcData; goto TIMER_CHECK; } unit->m_Packet.setLength(self->m_iPayloadSize); // reading next incoming packet if (self->m_pChannel->recvfrom(addr, unit->m_Packet) <= 0) goto TIMER_CHECK; id = unit->m_Packet.m_iID; // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets if (0 == id) { if (NULL != self->m_pListener) ((CUDT*)self->m_pListener)->listen(addr, unit->m_Packet); else if (self->m_pRendezvousQueue->retrieve(addr, id)) self->storePkt(id, unit->m_Packet.clone()); } else if (id > 0) { if (NULL != (u = self->m_pHash->lookup(id))) { if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) { if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) { if (0 == unit->m_Packet.getFlag()) u->processData(unit); else u->processCtrl(unit->m_Packet); u->checkTimers(); self->m_pRcvUList->update(u); } } } else if (self->m_pRendezvousQueue->retrieve(addr, id)) self->storePkt(id, unit->m_Packet.clone()); }TIMER_CHECK: // take care of the timing event for all UDT sockets CRNode* ul = self->m_pRcvUList->m_pUList; uint64_t currtime; CTimer::rdtsc(currtime); uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency(); while ((NULL != ul) && (ul->m_llTimeStamp < ctime)) { CUDT* u = ul->m_pUDT; if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) { u->checkTimers(); self->m_pRcvUList->update(u); } else { // the socket must be removed from Hash table first, then RcvUList self->m_pHash->remove(u->m_SocketID); self->m_pRcvUList->remove(u); } ul = self->m_pRcvUList->m_pUList; } } if (AF_INET == self->m_UnitQueue.m_iIPversion) delete (sockaddr_in*)addr; else delete (sockaddr_in6*)addr; #ifndef WIN32 return NULL; #else SetEvent(self->m_ExitCond); return 0; #endif}int CRcvQueue::recvfrom(const int32_t& id, CPacket& packet){ CGuard bufferlock(m_PassLock); map<int32_t, CPacket*>::iterator i = m_mBuffer.find(id); if (i == m_mBuffer.end()) { #ifndef WIN32 uint64_t now = CTimer::getTime(); timespec timeout; timeout.tv_sec = now / 1000000 + 1; timeout.tv_nsec = (now % 1000000) * 1000; pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout); #else ReleaseMutex(m_PassLock); WaitForSingleObject(m_PassCond, 1000); WaitForSingleObject(m_PassLock, INFINITE); #endif i = m_mBuffer.find(id); if (i == m_mBuffer.end()) { packet.setLength(-1); return -1; } } if (packet.getLength() < i->second->getLength()) { packet.setLength(-1); return -1; } memcpy(packet.m_nHeader, i->second->m_nHeader, CPacket::m_iPktHdrSize); memcpy(packet.m_pcData, i->second->m_pcData, i->second->getLength()); packet.setLength(i->second->getLength()); delete [] i->second->m_pcData; delete i->second; m_mBuffer.erase(i); return packet.getLength();}int CRcvQueue::setListener(const CUDT* u){ CGuard lslock(m_LSLock); if (NULL != m_pListener) return -1; m_pListener = (CUDT*)u; return 1;}void CRcvQueue::removeListener(const CUDT* u){ CGuard lslock(m_LSLock); if (u == m_pListener) m_pListener = NULL;}void CRcvQueue::setNewEntry(CUDT* u){ CGuard listguard(m_IDLock); m_vNewEntry.insert(m_vNewEntry.end(), u);}bool CRcvQueue::ifNewEntry(){ return !(m_vNewEntry.empty());}CUDT* CRcvQueue::getNewEntry(){ CGuard listguard(m_IDLock); if (m_vNewEntry.empty()) return NULL; CUDT* u = (CUDT*)*(m_vNewEntry.begin()); m_vNewEntry.erase(m_vNewEntry.begin()); return u;}void CRcvQueue::storePkt(const int32_t& id, CPacket* pkt){ #ifndef WIN32 pthread_mutex_lock(&m_PassLock); #else WaitForSingleObject(m_PassLock, INFINITE); #endif map<int32_t, CPacket*>::iterator i = m_mBuffer.find(id); if (i == m_mBuffer.end()) m_mBuffer[id] = pkt; else { delete [] i->second->m_pcData; delete i->second; i->second = pkt; } #ifndef WIN32 pthread_mutex_unlock(&m_PassLock); pthread_cond_signal(&m_PassCond); #else ReleaseMutex(m_PassLock); SetEvent(m_PassCond); #endif}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -