📄 queue.cpp
字号:
/*****************************************************************************Copyright (c) 2001 - 2008, The Board of Trustees of the University of Illinois.All rights reserved.Redistribution and use in source and binary forms, with or withoutmodification, are permitted provided that the following conditions aremet:* Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.* Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.* Neither the name of the University of Illinois nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "ASIS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULARPURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER ORCONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, ORPROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OFLIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDINGNEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THISSOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.*****************************************************************************//*****************************************************************************written by Yunhong Gu, last updated 12/08/2008*****************************************************************************/#ifdef WIN32 #include <winsock2.h> #include <ws2tcpip.h> #include <wspiapi.h>#endif#include <cstring>#include "common.h"#include "queue.h"#include "core.h"using namespace std;CUnitQueue::CUnitQueue():m_pQEntry(NULL),m_pCurrQueue(NULL),m_pLastQueue(NULL),m_iSize(0),m_iCount(0){}CUnitQueue::~CUnitQueue(){ CQEntry* p = m_pQEntry; while (p != NULL) { delete [] p->m_pUnit; delete [] p->m_pBuffer; CQEntry* q = p; if (p == m_pLastQueue) p = NULL; else p = p->m_pNext; delete q; }}int CUnitQueue::init(const int& size, const int& mss, const int& version){ CQEntry* tempq = NULL; CUnit* tempu = NULL; char* tempb = NULL; try { tempq = new CQEntry; tempu = new CUnit [size]; tempb = new char [size * mss]; } catch (...) { delete tempq; delete [] tempu; delete [] tempb; return -1; } for (int i = 0; i < size; ++ i) { tempu[i].m_iFlag = 0; tempu[i].m_Packet.m_pcData = tempb + i * mss; } tempq->m_pUnit = tempu; tempq->m_pBuffer = tempb; tempq->m_iSize = size; m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq; m_pQEntry->m_pNext = m_pQEntry; m_pAvailUnit = m_pCurrQueue->m_pUnit; m_iSize = size; m_iMSS = mss; m_iIPversion = version; return 0;}int CUnitQueue::increase(){ // adjust/correct m_iCount int real_count = 0; CQEntry* p = m_pQEntry; while (p != NULL) { CUnit* u = p->m_pUnit; for (CUnit* end = u + p->m_iSize; u != end; ++ u) if (u->m_iFlag != 0) ++ real_count; if (p == m_pLastQueue) p = NULL; else p = p->m_pNext; } m_iCount = real_count; if (double(m_iCount) / m_iSize < 0.9) return -1; CQEntry* tempq = NULL; CUnit* tempu = NULL; char* tempb = NULL; // all queues have the same size int size = m_pQEntry->m_iSize; try { tempq = new CQEntry; tempu = new CUnit [size]; tempb = new char [size * m_iMSS]; } catch (...) { delete tempq; delete [] tempu; delete [] tempb; return -1; } for (int i = 0; i < size; ++ i) { tempu[i].m_iFlag = 0; tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS; } tempq->m_pUnit = tempu; tempq->m_pBuffer = tempb; tempq->m_iSize = size; m_pLastQueue->m_pNext = tempq; m_pLastQueue = tempq; m_pLastQueue->m_pNext = m_pQEntry; m_iSize += size; return 0;}int CUnitQueue::shrink(){ // currently queue cannot be shrunk. return -1;}CUnit* CUnitQueue::getNextAvailUnit(){ if (m_iCount * 10 > m_iSize * 9) increase(); if (m_iCount >= m_iSize) return NULL; CQEntry* entrance = m_pCurrQueue; do { for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel; ++ m_pAvailUnit) if (m_pAvailUnit->m_iFlag == 0) return m_pAvailUnit; if (m_pCurrQueue->m_pUnit->m_iFlag == 0) { m_pAvailUnit = m_pCurrQueue->m_pUnit; return m_pAvailUnit; } m_pCurrQueue = m_pCurrQueue->m_pNext; m_pAvailUnit = m_pCurrQueue->m_pUnit; } while (m_pCurrQueue != entrance); increase(); return NULL;}CSndUList::CSndUList(){ m_iArrayLength = 4096; m_pHeap = new CSNode*[m_iArrayLength]; m_iLastEntry = -1; #ifndef WIN32 pthread_mutex_init(&m_ListLock, NULL); #else m_ListLock = CreateMutex(NULL, false, NULL); #endif}CSndUList::~CSndUList(){ delete [] m_pHeap; #ifndef WIN32 pthread_mutex_destroy(&m_ListLock); #else CloseHandle(m_ListLock); #endif}void CSndUList::insert(const int64_t& ts, const CUDT* u){ CGuard listguard(m_ListLock); // increase the heap array size if necessary if (m_iLastEntry == m_iArrayLength - 1) { CSNode** temp = NULL; try { temp = new CSNode*[m_iArrayLength * 2]; } catch(...) { return; } memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength); m_iArrayLength *= 2; delete [] m_pHeap; m_pHeap = temp; } insert_(ts, u);}void CSndUList::update(const CUDT* u, const bool& reschedule){ CGuard listguard(m_ListLock); CSNode* n = u->m_pSNode; if (n->m_iHeapLoc >= 0) { if (!reschedule) return; if (n->m_iHeapLoc == 0) { n->m_llTimeStamp = 1; m_pTimer->interrupt(); return; } remove_(u); } insert_(1, u);}int CSndUList::pop(sockaddr*& addr, CPacket& pkt){ CGuard listguard(m_ListLock); if (-1 == m_iLastEntry) return -1; CUDT* u = m_pHeap[0]->m_pUDT; remove_(u); if (!u->m_bConnected || u->m_bBroken) return -1; // pack a packet from the socket uint64_t ts; if (u->packData(pkt, ts) <= 0) return -1; addr = u->m_pPeerAddr; // insert a new entry, ts is the next processing time if (ts > 0) insert_(ts, u); return 1;}void CSndUList::remove(const CUDT* u){ CGuard listguard(m_ListLock); remove_(u);}uint64_t CSndUList::getNextProcTime(){ CGuard listguard(m_ListLock); if (-1 == m_iLastEntry) return 0; return m_pHeap[0]->m_llTimeStamp;}void CSndUList::insert_(const int64_t& ts, const CUDT* u){ CSNode* n = u->m_pSNode; // do not insert repeated node if (n->m_iHeapLoc >= 0) return; m_iLastEntry ++; m_pHeap[m_iLastEntry] = n; n->m_llTimeStamp = ts; int q = m_iLastEntry; int p = q; while (p != 0) { p = (q - 1) >> 1; if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp) { CSNode* t = m_pHeap[p]; m_pHeap[p] = m_pHeap[q]; m_pHeap[q] = t; t->m_iHeapLoc = q; q = p; } else break; } n->m_iHeapLoc = q; // first entry, activate the sending queue if (0 == m_iLastEntry) { #ifndef WIN32 pthread_mutex_lock(m_pWindowLock); pthread_cond_signal(m_pWindowCond); pthread_mutex_unlock(m_pWindowLock); #else SetEvent(*m_pWindowCond); #endif }}void CSndUList::remove_(const CUDT* u){ CSNode* n = u->m_pSNode; if (n->m_iHeapLoc >= 0) { // remove the node from heap m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry]; m_iLastEntry --; m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc; int q = n->m_iHeapLoc; int p = q * 2 + 1; while (p <= m_iLastEntry) { if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp)) p ++; if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp) { CSNode* t = m_pHeap[p]; m_pHeap[p] = m_pHeap[q]; m_pHeap[p]->m_iHeapLoc = p; m_pHeap[q] = t; m_pHeap[q]->m_iHeapLoc = q; q = p; p = q * 2 + 1; } else break; } n->m_iHeapLoc = -1; }}//CSndQueue::CSndQueue():m_pSndUList(NULL),m_pChannel(NULL),m_pTimer(NULL),m_bClosing(false){ #ifndef WIN32 pthread_cond_init(&m_WindowCond, NULL); pthread_mutex_init(&m_WindowLock, NULL); #else m_WindowLock = CreateMutex(NULL, false, NULL); m_WindowCond = CreateEvent(NULL, false, false, NULL); m_ExitCond = CreateEvent(NULL, false, false, NULL); #endif}CSndQueue::~CSndQueue(){ m_bClosing = true; #ifndef WIN32 pthread_mutex_lock(&m_WindowLock); pthread_cond_signal(&m_WindowCond); pthread_mutex_unlock(&m_WindowLock); if (0 != m_WorkerThread) pthread_join(m_WorkerThread, NULL); pthread_cond_destroy(&m_WindowCond); pthread_mutex_destroy(&m_WindowLock); #else SetEvent(m_WindowCond); if (NULL != m_WorkerThread) WaitForSingleObject(m_ExitCond, INFINITE); CloseHandle(m_WorkerThread); CloseHandle(m_WindowLock); CloseHandle(m_WindowCond); #endif delete m_pSndUList;}void CSndQueue::init(const CChannel* c, const CTimer* t){ m_pChannel = (CChannel*)c; m_pTimer = (CTimer*)t; m_pSndUList = new CSndUList; m_pSndUList->m_pWindowLock = &m_WindowLock; m_pSndUList->m_pWindowCond = &m_WindowCond; m_pSndUList->m_pTimer = m_pTimer; #ifndef WIN32 if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this)) { m_WorkerThread = 0; throw CUDTException(3, 1); } #else DWORD threadID; m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID); if (NULL == m_WorkerThread) throw CUDTException(3, 1); #endif}#ifndef WIN32 void* CSndQueue::worker(void* param)#else DWORD WINAPI CSndQueue::worker(LPVOID param)#endif{ CSndQueue* self = (CSndQueue*)param; CPacket pkt; while (!self->m_bClosing) { uint64_t ts = self->m_pSndUList->getNextProcTime(); if (ts > 0) { // wait until next processing time of the first socket on the list uint64_t currtime; CTimer::rdtsc(currtime); if (currtime < ts) self->m_pTimer->sleepto(ts); // it is time to process it, pop it out/remove from the list sockaddr* addr; CPacket pkt; if (self->m_pSndUList->pop(addr, pkt) < 0) continue; self->m_pChannel->sendto(addr, pkt); } else { // wait here if there is no sockets with data to be sent #ifndef WIN32 pthread_mutex_lock(&self->m_WindowLock); if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0)) pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock); pthread_mutex_unlock(&self->m_WindowLock); #else WaitForSingleObject(self->m_WindowCond, INFINITE); #endif } } #ifndef WIN32 return NULL; #else SetEvent(self->m_ExitCond); return 0; #endif}int CSndQueue::sendto(const sockaddr* addr, CPacket& packet){ // send out the packet immediately (high priority), this is a control packet m_pChannel->sendto(addr, packet); return packet.getLength();}//CRcvUList::CRcvUList():m_pUList(NULL),m_pLast(NULL){}CRcvUList::~CRcvUList(){}void CRcvUList::insert(const CUDT* u){ CRNode* n = u->m_pRNode; CTimer::rdtsc(n->m_llTimeStamp); n->m_bOnList = true; if (NULL == m_pUList) { // empty list, insert as the single node
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -