⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queue.cpp.svn-base

📁 UDT 4.0 based on the UDP.
💻 SVN-BASE
📖 第 1 页 / 共 2 页
字号:
/*****************************************************************************Copyright (c) 2001 - 2008, The Board of Trustees of the University of Illinois.All rights reserved.Redistribution and use in source and binary forms, with or withoutmodification, are permitted provided that the following conditions aremet:* Redistributions of source code must retain the above  copyright notice, this list of conditions and the  following disclaimer.* Redistributions in binary form must reproduce the  above copyright notice, this list of conditions  and the following disclaimer in the documentation  and/or other materials provided with the distribution.* Neither the name of the University of Illinois  nor the names of its contributors may be used to  endorse or promote products derived from this  software without specific prior written permission.THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "ASIS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULARPURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER ORCONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, ORPROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OFLIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDINGNEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THISSOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.*****************************************************************************//*****************************************************************************written by   Yunhong Gu, last updated 10/09/2008*****************************************************************************/#ifdef WIN32   #include <winsock2.h>   #include <ws2tcpip.h>   #include <wspiapi.h>#endif#include "common.h"#include "queue.h"#include "core.h"#ifdef USING_USTLusing namespace ustl;#elseusing namespace std;#endifCUnitQueue::CUnitQueue() throw():m_pQEntry(NULL),m_pCurrQueue(NULL),m_pLastQueue(NULL),m_iSize(0),m_iCount(0){}CUnitQueue::~CUnitQueue(){   CQEntry* p = m_pQEntry;   while (p != NULL)   {      delete [] p->m_pUnit;      delete [] p->m_pBuffer;      CQEntry* q = p;      if (p == m_pLastQueue)         p = NULL;      else         p = p->m_pNext;      delete q;   }}int CUnitQueue::init(const int size, const int mss, const int version){   CQEntry* tempq = NULL;   CUnit* tempu = NULL;   char* tempb = NULL;   tempq = new (nothrow) CQEntry;   tempu = new (nothrow) CUnit [size];   tempb = new (nothrow) char [size * mss];   if (!tempq ||      !tempu ||      !tempb)   {      delete tempq;      delete [] tempu;      delete [] tempb;      return -1;   }   for (int i = 0; i < size; ++i)   {      tempu[i].m_iFlag = 0;      tempu[i].m_Packet.m_pcData = tempb + i * mss;   }   tempq->m_pUnit = tempu;   tempq->m_pBuffer = tempb;   tempq->m_iSize = size;   m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq;   m_pQEntry->m_pNext = m_pQEntry;   m_pAvailUnit = m_pCurrQueue->m_pUnit;   m_iSize = size;   m_iMSS = mss;   m_iIPversion = version;   return 0;}int CUnitQueue::increase(){   // adjust/correct m_iCount   int real_count = 0;   CQEntry* p = m_pQEntry;   while (p != NULL)   {      CUnit* u = p->m_pUnit;      for (CUnit* end = u + p->m_iSize; u != end; ++ u)         if (u->m_iFlag != 0)            ++ real_count;      if (p == m_pLastQueue)         p = NULL;      else         p = p->m_pNext;   }   m_iCount = real_count;   if (double(m_iCount) / m_iSize < 0.9)      return -1;   CQEntry* tempq = NULL;   CUnit* tempu = NULL;   char* tempb = NULL;   // all queues have the same size   int size = m_pQEntry->m_iSize;   tempq = new (nothrow) CQEntry;   tempu = new (nothrow) CUnit [size];   tempb = new (nothrow) char [size * m_iMSS];   if (!tempq ||      !tempu ||      !tempb)   {      delete tempq;      delete [] tempu;      delete [] tempb;      return -1;   }   for (int i = 0; i < size; ++i)   {      tempu[i].m_iFlag = 0;      tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;   }   tempq->m_pUnit = tempu;   tempq->m_pBuffer = tempb;   tempq->m_iSize = size;   m_pLastQueue->m_pNext = tempq;   m_pLastQueue = tempq;   m_pLastQueue->m_pNext = m_pQEntry;   m_iSize += size;   return 0;}int CUnitQueue::shrink(){   // currently queue cannot be shrunk.   return -1;}CUnit* CUnitQueue::getNextAvailUnit(){   if (double(m_iCount) / m_iSize > 0.9)      increase();   if (m_iCount >= m_iSize)      return NULL;   CQEntry* entrance = m_pCurrQueue;   do   {      for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel; ++m_pAvailUnit)         if (m_pAvailUnit->m_iFlag == 0)            return m_pAvailUnit;      if (m_pCurrQueue->m_pUnit->m_iFlag == 0)      {         m_pAvailUnit = m_pCurrQueue->m_pUnit;         return m_pAvailUnit;      }      m_pCurrQueue = m_pCurrQueue->m_pNext;      m_pAvailUnit = m_pCurrQueue->m_pUnit;   } while (m_pCurrQueue != entrance);   increase();   return NULL;}CSndUList::CSndUList() throw():m_pHeap(NULL){   m_iArrayLength = 4096;      m_iLastEntry = -1;   #ifndef WIN32      pthread_mutex_init(&m_ListLock, NULL);   #else      m_ListLock = CreateMutex(NULL, false, NULL);   #endif}bool CSndUList::construct() throw(){   m_pHeap = new (nothrow) CSNode*[m_iArrayLength];   if (!m_pHeap)      return false;   return true;}CSndUList::~CSndUList(){   delete [] m_pHeap;   #ifndef WIN32      pthread_mutex_destroy(&m_ListLock);   #else      CloseHandle(m_ListLock);   #endif}void CSndUList::insert(const int64_t ts, const CUDT* u){   CGuard listguard(m_ListLock);   // increase the heap array size if necessary   if (m_iLastEntry == m_iArrayLength - 1)   {      CSNode** temp = NULL;      temp = new (nothrow) CSNode*[m_iArrayLength * 2];      if (!temp)         return;      memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength);      m_iArrayLength *= 2;      delete [] m_pHeap;      m_pHeap = temp;   }   insert_(ts, u);}void CSndUList::update(const CUDT* u, const bool reschedule){   CGuard listguard(m_ListLock);   CSNode* n = u->m_pSNode;   if (n->m_iHeapLoc >= 0)   {      if (!reschedule)         return;      if (n->m_iHeapLoc == 0)      {         n->m_llTimeStamp = 1;         m_pTimer->interrupt();         return;      }      remove_(u);   }   insert_(1, u);}CUDT* CSndUList::pop(){   CGuard listguard(m_ListLock);   if (-1 == m_iLastEntry)      return NULL;   CUDT* u = m_pHeap[0]->m_pUDT;   remove_(u);   return u;}void CSndUList::remove(const CUDT* u){   CGuard listguard(m_ListLock);   remove_(u);}uint64_t CSndUList::getNextProcTime(){   CGuard listguard(m_ListLock);   if (-1 == m_iLastEntry)      return 0;   return m_pHeap[0]->m_llTimeStamp;}void CSndUList::insert_(const int64_t ts, const CUDT* u){   CSNode* n = u->m_pSNode;   // do not insert repeated node   if (n->m_iHeapLoc >= 0)      return;   m_iLastEntry ++;   m_pHeap[m_iLastEntry] = n;   n->m_llTimeStamp = ts;   int q = m_iLastEntry;   int p = q;   while (p != 0)   {      p = (q - 1) >> 1;      if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp)      {         CSNode* t = m_pHeap[p];         m_pHeap[p] = m_pHeap[q];         m_pHeap[q] = t;         t->m_iHeapLoc = q;         q = p;      }      else         break;   }   n->m_iHeapLoc = q;   // first entry, activate the sending queue   if (0 == m_iLastEntry)   {      #ifndef WIN32         pthread_mutex_lock(m_pWindowLock);         pthread_cond_signal(m_pWindowCond);         pthread_mutex_unlock(m_pWindowLock);      #else         SetEvent(*m_pWindowCond);      #endif   }}void CSndUList::remove_(const CUDT* u){   CSNode* n = u->m_pSNode;   if (n->m_iHeapLoc >= 0)   {      // remove the node from heap      m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];      m_iLastEntry --;      m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;      int q = n->m_iHeapLoc;      int p = q * 2 + 1;      while (p <= m_iLastEntry)      {         if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp))            p ++;         if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp)         {            CSNode* t = m_pHeap[p];            m_pHeap[p] = m_pHeap[q];            m_pHeap[p]->m_iHeapLoc = p;            m_pHeap[q] = t;            m_pHeap[q]->m_iHeapLoc = q;            q = p;            p = q * 2 + 1;         }         else            break;      }      n->m_iHeapLoc = -1;   }}//CSndQueue::CSndQueue() throw():m_pSndUList(NULL),m_pChannel(NULL),m_pTimer(NULL),m_bClosing(false){   #ifndef WIN32      pthread_cond_init(&m_WindowCond, NULL);      pthread_mutex_init(&m_WindowLock, NULL);   #else      m_WindowLock = CreateMutex(NULL, false, NULL);      m_WindowCond = CreateEvent(NULL, false, false, NULL);      m_ExitCond = CreateEvent(NULL, false, false, NULL);   #endif}CSndQueue::~CSndQueue(){   m_bClosing = true;   #ifndef WIN32      pthread_mutex_lock(&m_WindowLock);      pthread_cond_signal(&m_WindowCond);      pthread_mutex_unlock(&m_WindowLock);      if (0 != m_WorkerThread)         pthread_join(m_WorkerThread, NULL);      pthread_cond_destroy(&m_WindowCond);      pthread_mutex_destroy(&m_WindowLock);   #else      SetEvent(m_WindowCond);      if (NULL != m_WorkerThread)         WaitForSingleObject(m_ExitCond, INFINITE);      CloseHandle(m_WorkerThread);      CloseHandle(m_WindowLock);      CloseHandle(m_WindowCond);   #endif   delete m_pSndUList;}CUDTException CSndQueue::init(const CChannel* c, const CTimer* t){   m_pChannel = (CChannel*)c;   m_pTimer = (CTimer*)t;   m_pSndUList = UDT::construct(new (nothrow) CSndUList);   if (!m_pSndUList)      return CUDTException(3, 2, 0);   m_pSndUList->m_pWindowLock = &m_WindowLock;   m_pSndUList->m_pWindowCond = &m_WindowCond;   m_pSndUList->m_pTimer = m_pTimer;   #ifndef WIN32      if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))      {         m_WorkerThread = 0;         return CUDTException(3, 1);      }   #else      DWORD threadID;      m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID);      if (NULL == m_WorkerThread)         return CUDTException(3, 1);   #endif      return CUDTException();}#ifndef WIN32   void* CSndQueue::worker(void* param)#else   DWORD WINAPI CSndQueue::worker(LPVOID param)#endif{   CSndQueue* self = (CSndQueue*)param;   CPacket pkt;   while (!self->m_bClosing)   {      uint64_t ts = self->m_pSndUList->getNextProcTime();      if (ts > 0)      {         // wait until next processing time of the first socket on the list         uint64_t currtime;         CTimer::rdtsc(currtime);         if (currtime < ts)            self->m_pTimer->sleepto(ts);         // it is time to process it, pop it out/remove from the list         CUDT* u = self->m_pSndUList->pop();         if ((NULL == u) || !u->m_bConnected || u->m_bBroken)            continue;         // pack a packet from the socket         uint64_t ts;         if (u->packData(pkt, ts) > 0)            self->m_pChannel->sendto(u->m_pPeerAddr, pkt);         // insert a new entry, ts is the next processing time         if (ts > 0)            self->m_pSndUList->insert(ts, u);      }      else      {         // wait here if there is no sockets with data to be sent         #ifndef WIN32            pthread_mutex_lock(&self->m_WindowLock);            if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))               pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);            pthread_mutex_unlock(&self->m_WindowLock);         #else            WaitForSingleObject(self->m_WindowCond, INFINITE);         #endif      }   }#ifndef WIN32   return NULL;#else   SetEvent(self->m_ExitCond);   return 0;#endif}int CSndQueue::sendto(const sockaddr* addr, CPacket& packet){   // send out the packet immediately (high priority), this is a control packet   m_pChannel->sendto(addr, packet);   return packet.getLength();}//CRcvUList::CRcvUList() throw():m_pUList(NULL),m_pLast(NULL){}CRcvUList::~CRcvUList(){}void CRcvUList::insert(const CUDT* u){   CRNode* n = u->m_pRNode;   CTimer::rdtsc(n->m_llTimeStamp);   n->m_bOnList = true;   if (NULL == m_pUList)   {      // empty list, insert as the single node      n->m_pPrev = n->m_pNext = NULL;      m_pLast = m_pUList = n;      return;   }   // always insert at the end for RcvUList   n->m_pPrev = m_pLast;   n->m_pNext = NULL;   m_pLast->m_pNext = n;   m_pLast = n;}void CRcvUList::remove(const CUDT* u){   CRNode* n = u->m_pRNode;   if (!n->m_bOnList)      return;   if (NULL == n->m_pPrev)   {      // n is the first node      m_pUList = n->m_pNext;      if (NULL == m_pUList)         m_pLast = NULL;      else         m_pUList->m_pPrev = NULL;   }   else   {      n->m_pPrev->m_pNext = n->m_pNext;      if (NULL == n->m_pNext)      {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -