⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queue.cpp

📁 udt.sdk.4.1.tar.gz更新包
💻 CPP
📖 第 1 页 / 共 2 页
字号:
   n->m_pPrev = m_pLast;   n->m_pNext = NULL;   m_pLast->m_pNext = n;   m_pLast = n;}void CRcvUList::remove(const int32_t& id){   if (NULL == m_pUList)      return;   if (id == m_pUList->m_iID)   {      CUDTList* n = m_pUList;      // remove first node      m_pUList = m_pUList->m_pNext;      if (NULL == m_pUList)         m_pLast = NULL;      else         m_pUList->m_pPrev = NULL;      n->m_bOnList = false;      return;   }   // check further   CUDTList* p = m_pUList;   while (NULL != p->m_pNext)   {      if (id == p->m_pNext->m_iID)      {         CUDTList* n = p->m_pNext;         p->m_pNext = p->m_pNext->m_pNext;         if (NULL != p->m_pNext)            p->m_pNext->m_pPrev = p;         else            m_pLast = p;         n->m_bOnList = false;         return;      }      p = p->m_pNext;   }}void CRcvUList::update(const int32_t& id){   if (NULL == m_pUList)      return;   if (id == m_pUList->m_iID)   {      CTimer::rdtsc(m_pUList->m_llTimeStamp);      // if there is only one node in the list, simply update it, otherwise move it to the end      if (NULL != m_pUList->m_pNext)      {         CUDTList* n = m_pUList;         m_pUList = m_pUList->m_pNext;         m_pUList->m_pPrev = NULL;         n->m_pNext = NULL;         n->m_pPrev = m_pLast;         m_pLast->m_pNext = n;         m_pLast = n;      }      return;   }   // check further   CUDTList* p = m_pUList;   while (NULL != p->m_pNext)   {      if (id == p->m_pNext->m_iID)      {         CTimer::rdtsc(p->m_pNext->m_llTimeStamp);         if (NULL != p->m_pNext->m_pNext)         {            CUDTList* n = p->m_pNext;            p->m_pNext = p->m_pNext->m_pNext;            p->m_pNext->m_pPrev = p;            n->m_pNext = NULL;            n->m_pPrev = m_pLast;            m_pLast->m_pNext = n;            m_pLast = n;         }         return;      }      p = p->m_pNext;   }}//CHash::CHash():m_pBucket(NULL),m_iHashSize(0){}CHash::~CHash(){   for (int i = 0; i < m_iHashSize; ++ i)   {      CBucket* b = m_pBucket[i];      while (NULL != b)      {         CBucket* n = b->m_pNext;         delete b;         b = n;      }   }   delete [] m_pBucket;}void CHash::init(const int& size){   m_pBucket = new CBucket* [size];   for (int i = 0; i < size; ++ i)      m_pBucket[i] = NULL;   m_iHashSize = size;}CUDT* CHash::lookup(const int32_t& id){   // simple hash function (% hash table size); suitable for socket descriptors   CBucket* b = m_pBucket[id % m_iHashSize];   while (NULL != b)   {      if (id == b->m_iID)         return b->m_pUDT;      b = b->m_pNext;   }   return NULL;}void CHash::insert(const int32_t& id, const CUDT* u){   CBucket* b = m_pBucket[id % m_iHashSize];   CBucket* n = new CBucket;   n->m_iID = id;   n->m_pUDT = (CUDT*)u;   n->m_pNext = b;   m_pBucket[id % m_iHashSize] = n;}void CHash::remove(const int32_t& id){   CBucket* b = m_pBucket[id % m_iHashSize];   CBucket* p = NULL;   while (NULL != b)   {      if (id == b->m_iID)      {         if (NULL == p)            m_pBucket[id % m_iHashSize] = b->m_pNext;         else            p->m_pNext = b->m_pNext;         delete b;         return;      }      p = b;      b = b->m_pNext;   }}//CRendezvousQueue::CRendezvousQueue(){   #ifndef WIN32      pthread_mutex_init(&m_RIDVectorLock, NULL);   #else      m_RIDVectorLock = CreateMutex(NULL, false, NULL);   #endif   m_vRendezvousID.clear();}CRendezvousQueue::~CRendezvousQueue(){   #ifndef WIN32      pthread_mutex_destroy(&m_RIDVectorLock);   #else      CloseHandle(m_RIDVectorLock);   #endif   for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i)   {      if (AF_INET == i->m_iIPversion)         delete (sockaddr_in*)i->m_pPeerAddr;      else         delete (sockaddr_in6*)i->m_pPeerAddr;   }   m_vRendezvousID.clear();}void CRendezvousQueue::insert(const UDTSOCKET& id, const int& ipv, const sockaddr* addr, CUDT* u){   CGuard vg(m_RIDVectorLock);   CRL r;   r.m_iID = id;   r.m_iPeerID = 0;   r.m_iIPversion = ipv;   r.m_pPeerAddr = (AF_INET == ipv) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;   memcpy(r.m_pPeerAddr, addr, (AF_INET == ipv) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));   r.m_pUDT = u;   m_vRendezvousID.insert(m_vRendezvousID.end(), r);}void CRendezvousQueue::remove(const UDTSOCKET& id){   CGuard vg(m_RIDVectorLock);   for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i)      if (i->m_iID == id)      {         if (AF_INET == i->m_iIPversion)            delete (sockaddr_in*)i->m_pPeerAddr;         else            delete (sockaddr_in6*)i->m_pPeerAddr;         m_vRendezvousID.erase(i);         return;      }}bool CRendezvousQueue::retrieve(const sockaddr* addr, UDTSOCKET& id, const UDTSOCKET& peerid, CUDT*& u){   CGuard vg(m_RIDVectorLock);   for (vector<CRL>::iterator i = m_vRendezvousID.begin(); i != m_vRendezvousID.end(); ++ i)      if (CIPAddress::ipcmp(addr, i->m_pPeerAddr, i->m_iIPversion) && ((0 == i->m_iPeerID) || (peerid == i->m_iPeerID)))      {         id = i->m_iID;         i->m_iPeerID = peerid;         u = i->m_pUDT;         return true;      }   return false;}//CRcvQueue::CRcvQueue():m_pRcvUList(NULL),m_pHash(NULL),m_pChannel(NULL),m_pTimer(NULL),m_bClosing(false),m_pListener(NULL),m_pRendezvousQueue(NULL){   #ifndef WIN32      pthread_mutex_init(&m_PassLock, NULL);      pthread_cond_init(&m_PassCond, NULL);      pthread_mutex_init(&m_LSLock, NULL);      pthread_mutex_init(&m_IDLock, NULL);   #else      m_PassLock = CreateMutex(NULL, false, NULL);      m_PassCond = CreateEvent(NULL, false, false, NULL);      m_LSLock = CreateMutex(NULL, false, NULL);      m_IDLock = CreateMutex(NULL, false, NULL);   #endif   m_vNewEntry.clear();   m_mBuffer.clear();}CRcvQueue::~CRcvQueue(){   m_bClosing = true;   #ifndef WIN32      if (0 != m_WorkerThread)         pthread_join(m_WorkerThread, NULL);      pthread_mutex_destroy(&m_PassLock);      pthread_cond_destroy(&m_PassCond);      pthread_mutex_destroy(&m_LSLock);      pthread_mutex_destroy(&m_IDLock);   #else      if (NULL != m_WorkerThread)         WaitForSingleObject(m_WorkerThread, INFINITE);      CloseHandle(m_WorkerThread);      CloseHandle(m_PassLock);      CloseHandle(m_PassCond);      CloseHandle(m_LSLock);      CloseHandle(m_IDLock);   #endif   delete m_pRcvUList;   delete m_pHash;   delete m_pRendezvousQueue;   for (map<int32_t, CPacket*>::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++ i)   {      delete [] i->second->m_pcData;      delete i->second;   }}void CRcvQueue::init(const int& qsize, const int& payload, const int& version, const int& hsize, const CChannel* cc, const CTimer* t){   m_iPayloadSize = payload;   m_UnitQueue.init(qsize, payload, version);   m_pHash = new CHash;   m_pHash->init(hsize);   m_pChannel = (CChannel*)cc;   m_pTimer = (CTimer*)t;   m_pRcvUList = new CRcvUList;   m_pRendezvousQueue = new CRendezvousQueue;   #ifndef WIN32      if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))      {         m_WorkerThread = 0;         throw CUDTException(3, 1);      }   #else      DWORD threadID;      m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &threadID);      if (NULL == m_WorkerThread)         throw CUDTException(3, 1);   #endif}#ifndef WIN32   void* CRcvQueue::worker(void* param)#else   DWORD WINAPI CRcvQueue::worker(LPVOID param)#endif{   CRcvQueue* self = (CRcvQueue*)param;   CUnit temp;   temp.m_Packet.m_pcData = new char[self->m_iPayloadSize];   sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;   while (!self->m_bClosing)   {      #ifdef NO_BUSY_WAITING         self->m_pTimer->tick();      #endif      // check waiting list, if new socket, insert it to the list      if (self->ifNewEntry())      {         CUDT* ne = self->getNewEntry();         if (NULL != ne)         {            self->m_pRcvUList->insert(ne);            self->m_pHash->insert(ne->m_SocketID, ne);         }      }      // find next available slot for incoming packet      CUnit* unit = self->m_UnitQueue.getNextAvailUnit();      if (NULL == unit)         unit = &temp;      unit->m_Packet.setLength(self->m_iPayloadSize);      CUDT* u = NULL;      int32_t id;      // reading next incoming packet      if (self->m_pChannel->recvfrom(addr, unit->m_Packet) <= 0)         goto TIMER_CHECK;      if (unit == &temp)         goto TIMER_CHECK;      id = unit->m_Packet.m_iID;      // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets      if (0 == id)      {         if (NULL != self->m_pListener)            ((CUDT*)self->m_pListener)->listen(addr, unit->m_Packet);         else if (self->m_pRendezvousQueue->retrieve(addr, id, ((CHandShake*)unit->m_Packet.m_pcData)->m_iID, u))         {            if (u->m_bConnected && !u->m_bBroken)               u->processCtrl(unit->m_Packet);            else               self->storePkt(id, unit->m_Packet.clone());         }      }      else if (id > 0)      {         if (NULL != (u = self->m_pHash->lookup(id)))         {            if (u->m_bConnected && !u->m_bBroken)            {               if (0 == unit->m_Packet.getFlag())                  u->processData(unit);               else                  u->processCtrl(unit->m_Packet);               u->checkTimers();               self->m_pRcvUList->update(id);            }         }         else            self->storePkt(id, unit->m_Packet.clone());      }TIMER_CHECK:      // take care of the timing event for all UDT sockets      CUDTList* ul = self->m_pRcvUList->m_pUList;      uint64_t currtime;      CTimer::rdtsc(currtime);      while ((NULL != ul) && (ul->m_llTimeStamp < currtime - 10000 * CTimer::getCPUFrequency()))      {         CUDT* u = ul->m_pUDT;         int32_t id = ul->m_iID;         u->checkTimers();         if (u->m_bConnected && !u->m_bBroken)            self->m_pRcvUList->update(id);         else         {            // the socket must be removed from Hash table first, then RcvUList            self->m_pHash->remove(id);            self->m_pRcvUList->remove(id);         }         ul = self->m_pRcvUList->m_pUList;      }   }   delete [] temp.m_Packet.m_pcData;   if (AF_INET == self->m_UnitQueue.m_iIPversion)      delete (sockaddr_in*)addr;   else      delete (sockaddr_in6*)addr;   return NULL;}int CRcvQueue::recvfrom(const int32_t& id, CPacket& packet){   CGuard bufferlock(m_PassLock);   map<int32_t, CPacket*>::iterator i = m_mBuffer.find(id);   if (i == m_mBuffer.end())   {      #ifndef WIN32         uint64_t now = CTimer::getTime();         timespec timeout;         timeout.tv_sec = now / 1000000 + 1;         timeout.tv_nsec = (now % 1000000) * 1000;         pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout);      #else         ReleaseMutex(m_PassLock);         WaitForSingleObject(m_PassCond, 1);         WaitForSingleObject(m_PassLock, INFINITE);      #endif      i = m_mBuffer.find(id);      if (i == m_mBuffer.end())      {         packet.setLength(-1);         return -1;      }   }   if (packet.getLength() < i->second->getLength())   {      packet.setLength(-1);      return -1;   }   memcpy(packet.m_nHeader, i->second->m_nHeader, CPacket::m_iPktHdrSize);   memcpy(packet.m_pcData, i->second->m_pcData, i->second->getLength());   packet.setLength(i->second->getLength());   delete [] i->second->m_pcData;   delete i->second;   m_mBuffer.erase(i);   return packet.getLength();}int CRcvQueue::setListener(const CUDT* u){   CGuard lslock(m_LSLock);   if (NULL != m_pListener)      return -1;   m_pListener = (CUDT*)u;   return 1;}void CRcvQueue::removeListener(const CUDT* u){   CGuard lslock(m_LSLock);   if (u == m_pListener)      m_pListener = NULL;}void CRcvQueue::setNewEntry(CUDT* u){   CGuard listguard(m_IDLock);   m_vNewEntry.insert(m_vNewEntry.end(), u);}bool CRcvQueue::ifNewEntry(){   return !(m_vNewEntry.empty());}CUDT* CRcvQueue::getNewEntry(){   CGuard listguard(m_IDLock);   if (m_vNewEntry.empty())      return NULL;   CUDT* u = (CUDT*)*(m_vNewEntry.begin());   m_vNewEntry.erase(m_vNewEntry.begin());   return u;}void CRcvQueue::storePkt(const int32_t& id, CPacket* pkt){   #ifndef WIN32      pthread_mutex_lock(&m_PassLock);   #else      WaitForSingleObject(m_PassLock, INFINITE);   #endif   map<int32_t, CPacket*>::iterator i = m_mBuffer.find(id);   if (i == m_mBuffer.end())      m_mBuffer[id] = pkt;   else   {      delete [] i->second->m_pcData;      delete i->second;      i->second = pkt;   }   #ifndef WIN32      pthread_mutex_unlock(&m_PassLock);      pthread_cond_signal(&m_PassCond);   #else      ReleaseMutex(m_PassLock);      SetEvent(m_PassCond);   #endif}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -