⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 api.cpp

📁 Last Update: Jan 22 2009 可靠UDP传输, 一套高效的基于windows平台的C++ 开发库
💻 CPP
📖 第 1 页 / 共 3 页
字号:
         if (ls->m_pQueuedSockets->size() > 0)         {            u = *(ls->m_pQueuedSockets->begin());            ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);            ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());            accepted = true;         }         else if (!ls->m_pUDT->m_bSynRecving)            accepted = true;         ReleaseMutex(ls->m_AcceptLock);         if  (!accepted & (CUDTSocket::LISTENING == ls->m_Status))            WaitForSingleObject(ls->m_AcceptCond, INFINITE);         if (CUDTSocket::LISTENING != ls->m_Status)         {            SetEvent(ls->m_AcceptCond);            accepted = true;         }      }   #endif   if (u == CUDT::INVALID_SOCK)   {      // non-blocking receiving, no connection available      if (!ls->m_pUDT->m_bSynRecving)         throw CUDTException(6, 2, 0);      // listening socket is closed      throw CUDTException(5, 6, 0);   }   if (AF_INET == locate(u)->m_iIPversion)      *addrlen = sizeof(sockaddr_in);   else      *addrlen = sizeof(sockaddr_in6);   // copy address information of peer node   memcpy(addr, locate(u)->m_pPeerAddr, *addrlen);   return u;}int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, const int& namelen){   CUDTSocket* s = locate(u);   if (NULL == s)      throw CUDTException(5, 4, 0);   // check the size of SOCKADDR structure   if (AF_INET == s->m_iIPversion)   {      if (namelen != sizeof(sockaddr_in))         throw CUDTException(5, 3, 0);   }   else   {      if (namelen != sizeof(sockaddr_in6))         throw CUDTException(5, 3, 0);   }   // a socket can "connect" only if it is in INIT or OPENED status   if (CUDTSocket::INIT == s->m_Status)   {      if (!s->m_pUDT->m_bRendezvous)      {         s->m_pUDT->open();         updateMux(s->m_pUDT);         s->m_Status = CUDTSocket::OPENED;      }      else         throw CUDTException(5, 8, 0);   }   else if (CUDTSocket::OPENED != s->m_Status)      throw CUDTException(5, 2, 0);   s->m_pUDT->connect(name);   s->m_Status = CUDTSocket::CONNECTED;   // copy address information of local node   s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);   // record peer address   if (AF_INET == s->m_iIPversion)   {      s->m_pPeerAddr = (sockaddr*)(new sockaddr_in);      memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in));   }   else   {      s->m_pPeerAddr = (sockaddr*)(new sockaddr_in6);      memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6));   }   return 0;}int CUDTUnited::close(const UDTSOCKET u){   CUDTSocket* s = locate(u);   // silently drop a request to close an invalid ID, rather than return error      if (NULL == s)      return 0;   s->m_pUDT->close();   // a socket will not be immediated removed when it is closed   // in order to prevent other methods from accessing invalid address   // a timer is started and the socket will be removed after approximately 1 second   s->m_TimeStamp = CTimer::getTime();   CUDTSocket::UDTSTATUS os = s->m_Status;   // synchronize with garbage collection.   CGuard::enterCS(m_ControlLock);   s->m_Status = CUDTSocket::CLOSED;   m_Sockets.erase(s->m_SocketID);   m_ClosedSockets[s->m_SocketID] = s;   if (0 != s->m_ListenSocket)   {      // if it is an accepted socket, remove it from the listener's queue      map<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(s->m_ListenSocket);      if (ls != m_Sockets.end())      {         CGuard::enterCS(ls->second->m_AcceptLock);         ls->second->m_pAcceptSockets->erase(s->m_SocketID);         CGuard::leaveCS(ls->second->m_AcceptLock);      }   }   CGuard::leaveCS(m_ControlLock);   // broadcast all "accept" waiting   if (CUDTSocket::LISTENING == os)   {      #ifndef WIN32         pthread_mutex_lock(&(s->m_AcceptLock));         pthread_mutex_unlock(&(s->m_AcceptLock));         pthread_cond_broadcast(&(s->m_AcceptCond));      #else         SetEvent(s->m_AcceptCond);      #endif   }   CTimer::triggerEvent();   return 0;}int CUDTUnited::getpeername(const UDTSOCKET u, sockaddr* name, int* namelen){   if (CUDTSocket::CONNECTED != getStatus(u))      throw CUDTException(2, 2, 0);   CUDTSocket* s = locate(u);   if (NULL == s)      throw CUDTException(5, 4, 0);   if (!s->m_pUDT->m_bConnected || s->m_pUDT->m_bBroken)      throw CUDTException(2, 2, 0);   if (AF_INET == s->m_iIPversion)      *namelen = sizeof(sockaddr_in);   else      *namelen = sizeof(sockaddr_in6);   // copy address information of peer node   memcpy(name, s->m_pPeerAddr, *namelen);   return 0;}int CUDTUnited::getsockname(const UDTSOCKET u, sockaddr* name, int* namelen){   CUDTSocket* s = locate(u);   if (NULL == s)      throw CUDTException(5, 4, 0);   if (CUDTSocket::INIT == s->m_Status)      throw CUDTException(2, 2, 0);   if (AF_INET == s->m_iIPversion)      *namelen = sizeof(sockaddr_in);   else      *namelen = sizeof(sockaddr_in6);   // copy address information of local node   memcpy(name, s->m_pSelfAddr, *namelen);   return 0;}int CUDTUnited::select(ud_set* readfds, ud_set* writefds, ud_set* exceptfds, const timeval* timeout){   uint64_t entertime = CTimer::getTime();   uint64_t to;   if (NULL == timeout)      to = 0xFFFFFFFFFFFFFFFFULL;   else      to = timeout->tv_sec * 1000000 + timeout->tv_usec;   // initialize results   int count = 0;   set<UDTSOCKET> rs, ws, es;   // retrieve related UDT sockets   vector<CUDTSocket*> ru, wu, eu;   CUDTSocket* s;   if (NULL != readfds)      for (set<UDTSOCKET>::iterator i1 = readfds->begin(); i1 != readfds->end(); ++ i1)      {         if (CUDTSocket::BROKEN == getStatus(*i1))         {            rs.insert(*i1);            ++ count;         }         else if (NULL == (s = locate(*i1)))            throw CUDTException(5, 4, 0);         else            ru.insert(ru.end(), s);      }   if (NULL != writefds)      for (set<UDTSOCKET>::iterator i2 = writefds->begin(); i2 != writefds->end(); ++ i2)      {         if (CUDTSocket::BROKEN == getStatus(*i2))         {            ws.insert(*i2);            ++ count;         }         else if (NULL == (s = locate(*i2)))            throw CUDTException(5, 4, 0);         else            wu.insert(wu.end(), s);      }   if (NULL != exceptfds)      for (set<UDTSOCKET>::iterator i3 = exceptfds->begin(); i3 != exceptfds->end(); ++ i3)      {         if (CUDTSocket::BROKEN == getStatus(*i3))         {            es.insert(*i3);            ++ count;         }         else if (NULL == (s = locate(*i3)))            throw CUDTException(5, 4, 0);         else            eu.insert(eu.end(), s);      }   do   {      // query read sockets      for (vector<CUDTSocket*>::iterator j1 = ru.begin(); j1 != ru.end(); ++ j1)      {         s = *j1;         if ((s->m_pUDT->m_bConnected && (s->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && ((s->m_pUDT->m_iSockType == UDT_STREAM) || (s->m_pUDT->m_pRcvBuffer->getRcvMsgNum() > 0)))            || (!s->m_pUDT->m_bListening && (s->m_pUDT->m_bBroken || !s->m_pUDT->m_bConnected))            || (s->m_pUDT->m_bListening && (s->m_pQueuedSockets->size() > 0))            || (s->m_Status == CUDTSocket::CLOSED))         {            rs.insert(s->m_SocketID);            ++ count;         }      }      // query write sockets      for (vector<CUDTSocket*>::iterator j2 = wu.begin(); j2 != wu.end(); ++ j2)      {         s = *j2;         if ((s->m_pUDT->m_bConnected && (s->m_pUDT->m_pSndBuffer->getCurrBufSize() < s->m_pUDT->m_iSndBufSize))            || s->m_pUDT->m_bBroken || !s->m_pUDT->m_bConnected || (s->m_Status == CUDTSocket::CLOSED))         {            ws.insert(s->m_SocketID);            ++ count;         }      }      // query expections on sockets      for (vector<CUDTSocket*>::iterator j3 = eu.begin(); j3 != eu.end(); ++ j3)      {         // check connection request status, not supported now      }      if (0 < count)         break;      CTimer::waitForEvent();   } while (to > CTimer::getTime() - entertime);   if (NULL != readfds)      *readfds = rs;   if (NULL != writefds)      *writefds = ws;   if (NULL != exceptfds)      *exceptfds = es;   return count;}CUDTSocket* CUDTUnited::locate(const UDTSOCKET u){   CGuard cg(m_ControlLock);   map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);   if ( (i == m_Sockets.end()) || (i->second->m_Status == CUDTSocket::CLOSED))      return NULL;   return i->second;}CUDTSocket* CUDTUnited::locate(const UDTSOCKET u, const sockaddr* peer, const UDTSOCKET& id, const int32_t& isn){   CGuard cg(m_ControlLock);   map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);   CGuard ag(i->second->m_AcceptLock);   // look up the "peer" address in queued sockets set   for (set<UDTSOCKET>::iterator j1 = i->second->m_pQueuedSockets->begin(); j1 != i->second->m_pQueuedSockets->end(); ++ j1)   {      map<UDTSOCKET, CUDTSocket*>::iterator k1 = m_Sockets.find(*j1);      // this socket might have been closed and moved m_ClosedSockets      if (k1 == m_Sockets.end())         continue;      if (CIPAddress::ipcmp(peer, k1->second->m_pPeerAddr, i->second->m_iIPversion))      {         if ((id == k1->second->m_PeerID) && (isn == k1->second->m_iISN))            return k1->second;      }   }   // look up the "peer" address in accept sockets set   for (set<UDTSOCKET>::iterator j2 = i->second->m_pAcceptSockets->begin(); j2 != i->second->m_pAcceptSockets->end(); ++ j2)   {      map<UDTSOCKET, CUDTSocket*>::iterator k2 = m_Sockets.find(*j2);      // this socket might have been closed and moved m_ClosedSockets      if (k2 == m_Sockets.end())         continue;      if (CIPAddress::ipcmp(peer, k2->second->m_pPeerAddr, i->second->m_iIPversion))      {         if ((id == k2->second->m_PeerID) && (isn == k2->second->m_iISN))            return k2->second;      }   }   return NULL;}void CUDTUnited::checkBrokenSockets(){   CGuard cg(m_ControlLock);   // set of sockets To Be Closed and To Be Removed   set<UDTSOCKET> tbc;   set<UDTSOCKET> tbr;   for (map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i)   {      // check broken connection      if (i->second->m_pUDT->m_bBroken)      {         // if there is still data in the receiver buffer, wait longer         if ((i->second->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && (i->second->m_pUDT->m_iBrokenCounter -- > 0))            continue;         //close broken connections and start removal timer         i->second->m_Status = CUDTSocket::CLOSED;         i->second->m_TimeStamp = CTimer::getTime();         tbc.insert(i->first);         m_ClosedSockets[i->first] = i->second;         // remove from listener's queue         map<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket);         if (ls != m_Sockets.end())         {            CGuard::enterCS(ls->second->m_AcceptLock);            ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);            ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);            CGuard::leaveCS(ls->second->m_AcceptLock);         }      }   }   for (map<UDTSOCKET, CUDTSocket*>::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++ j)   {      // timeout 1 second to destroy a socket AND it has been removed from RcvUList      if ((CTimer::getTime() - j->second->m_TimeStamp > 1000000) && ((NULL == j->second->m_pUDT->m_pRNode) || !j->second->m_pUDT->m_pRNode->m_bOnList))         tbr.insert(j->first);      // sockets cannot be removed here because it will invalidate the map iterator   }   // move closed sockets to the ClosedSockets structure   for (set<UDTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++ k)      m_Sockets.erase(*k);   // remove those timeout sockets   for (set<UDTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l)      removeSocket(*l);}void CUDTUnited::removeSocket(const UDTSOCKET u){   map<UDTSOCKET, CUDTSocket*>::iterator i = m_ClosedSockets.find(u);   // invalid socket ID   if (i == m_ClosedSockets.end())      return;   // decrease multiplexer reference count, and remove it if necessary   int port;   if (AF_INET == i->second->m_iIPversion)      port = ntohs(((sockaddr_in*)(i->second->m_pSelfAddr))->sin_port);   else      port = ntohs(((sockaddr_in6*)(i->second->m_pSelfAddr))->sin6_port);   vector<CMultiplexer>::iterator m;   for (m = m_vMultiplexer.begin(); m != m_vMultiplexer.end(); ++ m)      if (port == m->m_iPort)         break;   if (NULL != i->second->m_pQueuedSockets)   {      CGuard::enterCS(i->second->m_AcceptLock);      // if it is a listener, close all un-accepted sockets in its queue and remove them later      set<UDTSOCKET> tbc;      for (set<UDTSOCKET>::iterator q = i->second->m_pQueuedSockets->begin(); q != i->second->m_pQueuedSockets->end(); ++ q)      {         m_Sockets[*q]->m_pUDT->close();         m_Sockets[*q]->m_TimeStamp = CTimer::getTime();         m_Sockets[*q]->m_Status = CUDTSocket::CLOSED;         m_ClosedSockets[*q] = m_Sockets[*q];      }      for (set<UDTSOCKET>::iterator c = tbc.begin(); c != tbc.end(); ++ c)         m_Sockets.erase(*c);      CGuard::leaveCS(i->second->m_AcceptLock);   }   // delete this one   i->second->m_pUDT->close();   delete m_ClosedSockets[u];   m_ClosedSockets.erase(u);   if (m == m_vMultiplexer.end())      return;   m->m_iRefCount --;   if (0 == m->m_iRefCount)   {      m->m_pChannel->close();      delete m->m_pSndQueue;      delete m->m_pRcvQueue;      delete m->m_pTimer;      delete m->m_pChannel;      m_vMultiplexer.erase(m);   }}void CUDTUnited::setError(CUDTException* e){   #ifndef WIN32      delete (CUDTException*)pthread_getspecific(m_TLSError);      pthread_setspecific(m_TLSError, e);   #else      CGuard tg(m_TLSLock);      delete (CUDTException*)TlsGetValue(m_TLSError);      TlsSetValue(m_TLSError, e);      m_mTLSRecord[GetCurrentThreadId()] = e;   #endif}CUDTException* CUDTUnited::getError(){   #ifndef WIN32      if(NULL == pthread_getspecific(m_TLSError))         pthread_setspecific(m_TLSError, new CUDTException);      return (CUDTException*)pthread_getspecific(m_TLSError);   #else      CGuard tg(m_TLSLock);      if(NULL == TlsGetValue(m_TLSError))      {         CUDTException* e = new CUDTException;         TlsSetValue(m_TLSError, e);         m_mTLSRecord[GetCurrentThreadId()] = e;      }      return (CUDTException*)TlsGetValue(m_TLSError);   #endif}#ifdef WIN32void CUDTUnited::checkTLSValue(){   CGuard tg(m_TLSLock);   vector<DWORD> tbr;   for (map<DWORD, CUDTException*>::iterator i = m_mTLSRecord.begin(); i != m_mTLSRecord.end(); ++ i)   {      HANDLE h = OpenThread(THREAD_QUERY_INFORMATION, FALSE, i->first);      if (NULL == h)      {         tbr.insert(tbr.end(), i->first);         break;      }      if (WAIT_OBJECT_0 == WaitForSingleObject(h, 0))      {         delete i->second;         tbr.insert(tbr.end(), i->first);      }   }   for (vector<DWORD>::iterator j = tbr.begin(); j != tbr.end(); ++ j)      m_mTLSRecord.erase(*j);}#endifvoid CUDTUnited::updateMux(CUDT* u, const sockaddr* addr, const UDPSOCKET* udpsock){   CGuard cg(m_ControlLock);   if ((u->m_bReuseAddr) && (NULL != addr))   {      int port = (AF_INET == u->m_iIPversion) ? ntohs(((sockaddr_in*)addr)->sin_port) : ntohs(((sockaddr_in6*)addr)->sin6_port);      // find a reusable address      for (vector<CMultiplexer>::iterator i = m_vMultiplexer.begin(); i != m_vMultiplexer.end(); ++ i)      {         if ((i->m_iIPversion == u->m_iIPversion) && (i->m_iMSS == u->m_iMSS) && i->m_bReusable)         {            if (i->m_iPort == port)            {               // reuse the existing multiplexer               ++ i->m_iRefCount;               u->m_pSndQueue = i->m_pSndQueue;               u->m_pRcvQueue = i->m_pRcvQueue;               return;            }         }      }   }   // a new multiplexer is needed   CMultiplexer m;   m.m_iMSS = u->m_iMSS;   m.m_iIPversion = u->m_iIPversion;   m.m_iRefCount = 1;   m.m_bReusable = u->m_bReuseAddr;   m.m_pChannel = new CChannel(u->m_iIPversion);   m.m_pChannel->setSndBufSize(u->m_iUDPSndBufSize);   m.m_pChannel->setRcvBufSize(u->m_iUDPRcvBufSize);   try   {      if (NULL != udpsock)         m.m_pChannel->open(*udpsock);      else         m.m_pChannel->open(addr);   }   catch (CUDTException& e)   {      m.m_pChannel->close();      delete m.m_pChannel;      throw e;   }   sockaddr* sa = (AF_INET == u->m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;   m.m_pChannel->getSockAddr(sa);   m.m_iPort = (AF_INET == u->m_iIPversion) ? ntohs(((sockaddr_in*)sa)->sin_port) : ntohs(((sockaddr_in6*)sa)->sin6_port);   if (AF_INET == u->m_iIPversion) delete (sockaddr_in*)sa; else delete (sockaddr_in6*)sa;   m.m_pTimer = new CTimer;   m.m_pSndQueue = new CSndQueue;   m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);   m.m_pRcvQueue = new CRcvQueue;   m.m_pRcvQueue->init(32, u->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);   m_vMultiplexer.insert(m_vMultiplexer.end(), m);   u->m_pSndQueue = m.m_pSndQueue;   u->m_pRcvQueue = m.m_pRcvQueue;}void CUDTUnited::updateMux(CUDT* u, const CUDTSocket* ls){   CGuard cg(m_ControlLock);   int port = (AF_INET == ls->m_iIPversion) ? ntohs(((sockaddr_in*)ls->m_pSelfAddr)->sin_port) : ntohs(((sockaddr_in6*)ls->m_pSelfAddr)->sin6_port);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -