📄 api.cpp.svn-base
字号:
{ if (CUDTSocket::BROKEN == getStatus(*i3)) { es.insert(*i3); ++count; } else if (NULL == (s = locate(*i3))) { es.insert(*i3); ++count; //return CUDTException(5, 4, 0); } //else //{ // eu.insert(eu.end(), s); //} } } do { // query read sockets for (vector<CUDTSocket*>::iterator j1 = ru.begin(); j1 != ru.end(); ++j1) { s = *j1; if ((s->m_pUDT->m_bConnected && (s->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && ((s->m_pUDT->m_iSockType == UDT_STREAM) || (s->m_pUDT->m_pRcvBuffer->getRcvMsgNum() > 0))) || (!s->m_pUDT->m_bListening && (s->m_pUDT->m_bBroken || !s->m_pUDT->m_bConnected)) || (s->m_pUDT->m_bListening && (s->m_pQueuedSockets->size() > 0)) || (s->m_Status == CUDTSocket::CLOSED)) { rs.insert(s->m_SocketID); ++count; } } // query write sockets for (vector<CUDTSocket*>::iterator j2 = wu.begin(); j2 != wu.end(); ++j2) { s = *j2; if ((s->m_pUDT->m_bConnected && (s->m_pUDT->m_pSndBuffer->getCurrBufSize() < s->m_pUDT->m_iSndBufSize)) || s->m_pUDT->m_bBroken || !s->m_pUDT->m_bConnected || (s->m_Status == CUDTSocket::CLOSED)) { ws.insert(s->m_SocketID); ++count; } } // query expections on sockets //for (vector<CUDTSocket*>::iterator j3 = eu.begin(); j3 != eu.end(); ++j3) //{ // // check connection request status, not supported now //} if (0 < count) break; CTimer::waitForEvent(); } while (to > CTimer::getTime() - entertime); if (NULL != readfds) *readfds = rs; if (NULL != writefds) *writefds = ws; if (NULL != exceptfds) *exceptfds = es; return CUDTException();}CUDTSocket* CUDTUnited::locate(const UDTSOCKET u){ CGuard cg(m_ControlLock); map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u); if ( (i == m_Sockets.end()) || (i->second->m_Status == CUDTSocket::CLOSED)) return NULL; return i->second;}CUDTSocket* CUDTUnited::locate(const UDTSOCKET u, const sockaddr* peer, const UDTSOCKET id, const int32_t isn){ CGuard cg(m_ControlLock); map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u); CGuard ag(i->second->m_AcceptLock); // look up the "peer" address in queued sockets set for (set<UDTSOCKET>::iterator j1 = i->second->m_pQueuedSockets->begin(); j1 != i->second->m_pQueuedSockets->end(); ++ j1) { map<UDTSOCKET, CUDTSocket*>::iterator k1 = m_Sockets.find(*j1); if (CIPAddress::ipcmp(peer, k1->second->m_pPeerAddr, i->second->m_iIPversion)) { if ((id == k1->second->m_PeerID) && (isn == k1->second->m_iISN)) return k1->second; } } // look up the "peer" address in accept sockets set for (set<UDTSOCKET>::iterator j2 = i->second->m_pAcceptSockets->begin(); j2 != i->second->m_pAcceptSockets->end(); ++ j2) { map<UDTSOCKET, CUDTSocket*>::iterator k2 = m_Sockets.find(*j2); if (CIPAddress::ipcmp(peer, k2->second->m_pPeerAddr, i->second->m_iIPversion)) { if ((id == k2->second->m_PeerID) && (isn == k2->second->m_iISN)) return k2->second; } } return NULL;}void CUDTUnited::checkBrokenSockets(){ CGuard cg(m_ControlLock); // set of sockets To Be Removed set<UDTSOCKET> tbr; for (map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i) { if (CUDTSocket::CLOSED != i->second->m_Status) { // check broken connection if (i->second->m_pUDT->m_bBroken) { //close broken connections and start removal timer i->second->m_Status = CUDTSocket::CLOSED; i->second->m_TimeStamp = CTimer::getTime(); // remove from listener's queue map<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket); if (ls != m_Sockets.end()) { #ifndef WIN32 pthread_mutex_lock(&(ls->second->m_AcceptLock)); #else WaitForSingleObject(ls->second->m_AcceptLock, INFINITE); #endif ls->second->m_pQueuedSockets->erase(i->second->m_SocketID); #ifndef WIN32 pthread_mutex_unlock(&(ls->second->m_AcceptLock)); #else ReleaseMutex(ls->second->m_AcceptLock); #endif } } } else { // timeout 1 second to destroy a socket AND it has been removed from RcvUList if ((CTimer::getTime() - i->second->m_TimeStamp > 1000000) && ((NULL == i->second->m_pUDT->m_pRNode) || !i->second->m_pUDT->m_pRNode->m_bOnList)) tbr.insert(i->second->m_SocketID); // sockets cannot be removed here because it will invalidate the map iterator } } // remove those timeout sockets for (set<UDTSOCKET>::iterator k = tbr.begin(); k != tbr.end(); ++ k) removeSocket(*k);}void CUDTUnited::removeSocket(const UDTSOCKET u){ map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u); // invalid socket ID if (i == m_Sockets.end()) return; // decrease multiplexer reference count, and remove it if necessary int port; if (AF_INET == i->second->m_iIPversion) port = ntohs(((sockaddr_in*)(i->second->m_pSelfAddr))->sin_port); else port = ntohs(((sockaddr_in6*)(i->second->m_pSelfAddr))->sin6_port); vector<CMultiplexer>::iterator m; for (m = m_vMultiplexer.begin(); m != m_vMultiplexer.end(); ++ m) if (port == m->m_iPort) break; if (0 != i->second->m_ListenSocket) { // if it is an accepted socket, remove it from the listener's queue map<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket); if (ls != m_Sockets.end()) { #ifndef WIN32 pthread_mutex_lock(&(ls->second->m_AcceptLock)); #else WaitForSingleObject(ls->second->m_AcceptLock, INFINITE); #endif ls->second->m_pQueuedSockets->erase(u); ls->second->m_pAcceptSockets->erase(u); #ifndef WIN32 pthread_mutex_unlock(&(ls->second->m_AcceptLock)); #else ReleaseMutex(ls->second->m_AcceptLock); #endif } } else if (NULL != i->second->m_pQueuedSockets) { #ifndef WIN32 pthread_mutex_lock(&(i->second->m_AcceptLock)); #else WaitForSingleObject(i->second->m_AcceptLock, INFINITE); #endif // if it is a listener, close all un-accepted sockets in its queue and remove them later for (set<UDTSOCKET>::iterator q = i->second->m_pQueuedSockets->begin(); q != i->second->m_pQueuedSockets->end(); ++ q) { m_Sockets[*q]->m_pUDT->close(); m_Sockets[*q]->m_TimeStamp = CTimer::getTime(); m_Sockets[*q]->m_Status = CUDTSocket::CLOSED; } #ifndef WIN32 pthread_mutex_unlock(&(i->second->m_AcceptLock)); #else ReleaseMutex(i->second->m_AcceptLock); #endif } // delete this one m_Sockets[u]->m_pUDT->close(); delete m_Sockets[u]; m_Sockets.erase(u); if (m == m_vMultiplexer.end()) return; m->m_iRefCount --; if (0 == m->m_iRefCount) { m->m_pChannel->close(); delete m->m_pSndQueue; delete m->m_pRcvQueue; delete m->m_pTimer; delete m->m_pChannel; m_vMultiplexer.erase(m); }}void CUDTUnited::setError(CUDTException* e){ #ifndef WIN32 delete (CUDTException*)pthread_getspecific(m_TLSError); pthread_setspecific(m_TLSError, e); #else delete (CUDTException*)TlsGetValue(m_TLSError); TlsSetValue(m_TLSError, e); #endif}CUDTException* CUDTUnited::getError(){ #ifndef WIN32 if(NULL == pthread_getspecific(m_TLSError)) pthread_setspecific(m_TLSError, new (nothrow) CUDTException); return (CUDTException*)pthread_getspecific(m_TLSError); #else if(NULL == TlsGetValue(m_TLSError)) TlsSetValue(m_TLSError, new (nothrow) CUDTException); return (CUDTException*)TlsGetValue(m_TLSError); #endif}CUDTException CUDTUnited::updateMux(CUDT* u, const sockaddr* addr, const UDPSOCKET* udpsock){ CGuard cg(m_ControlLock); if ((u->m_bReuseAddr) && (NULL != addr)) { int port = (AF_INET == u->m_iIPversion) ? ntohs(((sockaddr_in*)addr)->sin_port) : ntohs(((sockaddr_in6*)addr)->sin6_port); // find a reusable address for (vector<CMultiplexer>::iterator i = m_vMultiplexer.begin(); i != m_vMultiplexer.end(); ++ i) { if ((i->m_iIPversion == u->m_iIPversion) && (i->m_iMSS == u->m_iMSS) && i->m_bReusable) { if (i->m_iPort == port) { // reuse the existing multiplexer ++ i->m_iRefCount; u->m_pSndQueue = i->m_pSndQueue; u->m_pRcvQueue = i->m_pRcvQueue; return CUDTException(); } } } } // a new multiplexer is needed CMultiplexer m; m.m_iMSS = u->m_iMSS; m.m_iIPversion = u->m_iIPversion; m.m_iRefCount = 1; m.m_bReusable = u->m_bReuseAddr; m.m_pChannel = new (nothrow) CChannel(u->m_iIPversion); if (!(m.m_pChannel)) { return CUDTException(3, 2, 0); } m.m_pChannel->setSndBufSize(u->m_iUDPSndBufSize); m.m_pChannel->setRcvBufSize(u->m_iUDPRcvBufSize); CUDTException e; if (NULL != udpsock) e = m.m_pChannel->open(*udpsock); else e = m.m_pChannel->open(addr); if (e.getErrorCode()) { m.m_pChannel->close(); m.cleanUp(); return e; } sockaddr* sa = (AF_INET == u->m_iIPversion) ? (sockaddr*) new (nothrow) sockaddr_in : (sockaddr*) new (nothrow) sockaddr_in6; if (!sa) {ERR_ALLOC_FAILED_updateMux: m.m_pChannel->close(); m.cleanUp(); return CUDTException(3, 2, 0); } m.m_pChannel->getSockAddr(sa); m.m_iPort = (AF_INET == u->m_iIPversion) ? ntohs(((sockaddr_in*)sa)->sin_port) : ntohs(((sockaddr_in6*)sa)->sin6_port); if (AF_INET == u->m_iIPversion) delete (sockaddr_in*)sa; else delete (sockaddr_in6*)sa; m.m_pTimer = new (nothrow) CTimer; if (!(m.m_pTimer)) { goto ERR_ALLOC_FAILED_updateMux; } m.m_pSndQueue = new (nothrow) CSndQueue; if (!(m.m_pSndQueue)) { goto ERR_ALLOC_FAILED_updateMux; } m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer); m.m_pRcvQueue = new (nothrow) CRcvQueue; if (!(m.m_pRcvQueue)) { goto ERR_ALLOC_FAILED_updateMux; } m.m_pRcvQueue->init(32, u->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer); m_vMultiplexer.insert(m_vMultiplexer.end(), m); u->m_pSndQueue = m.m_pSndQueue; u->m_pRcvQueue = m.m_pRcvQueue; return CUDTException();}void CUDTUnited::updateMux(CUDT* u, const CUDTSocket* ls){ CGuard cg(m_ControlLock); int port = (AF_INET == ls->m_iIPversion) ? ntohs(((sockaddr_in*)ls->m_pSelfAddr)->sin_port) : ntohs(((sockaddr_in6*)ls->m_pSelfAddr)->sin6_port); // find the listener's address for (vector<CMultiplexer>::iterator i = m_vMultiplexer.begin(); i != m_vMultiplexer.end(); ++ i) { if (i->m_iPort == port) { // reuse the existing multiplexer ++(i->m_iRefCount); u->m_pSndQueue = i->m_pSndQueue; u->m_pRcvQueue = i->m_pRcvQueue; return; } }}#ifndef WIN32 void* CUDTUnited::garbageCollect(void* p)#else DWORD WINAPI CUDTUnited::garbageCollect(LPVOID p)#endif{ CUDTUnited* self = (CUDTUnited*)p; while (!self->m_bClosing) { self->checkBrokenSockets(); #ifndef WIN32 timeval now; timespec timeout; gettimeofday(&now, 0); timeout.tv_sec = now.tv_sec + 1; timeout.tv_nsec = now.tv_usec * 1000; pthread_cond_timedwait(&self->m_GCStopCond, &self->m_GCStopLock, &timeout); #else WaitForSingleObject(self->m_GCStopCond, 1000); #endif } // remove all active sockets for (map<UDTSOCKET, CUDTSocket*>::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++ i) { i->second->m_Status = CUDTSocket::CLOSED; i->second->m_TimeStamp = 0; } self->checkBrokenSockets(); #ifndef WIN32 return NULL; #else SetEvent(self->m_GCExitCond); return 0; #endif }////////////////////////////////////////////////////////////////////////////////UDTSOCKET CUDT::socket(int af, int type, int){ CUDTUnited* udtUnited = getUDTUnited(); if (udtUnited) { UDTSOCKET s = INVALID_SOCK; CUDTException e = udtUnited->newSocket(af, type, s); if (e.getErrorCode()) { udtUnited->setError(new (nothrow) CUDTException(e)); } else { return s; } } return INVALID_SOCK;}int CUDT::bind(UDTSOCKET u, const sockaddr* name, int namelen){ CUDTUnited* udtUnited = getUDTUnited(); if (udtUnited) { CUDTException e = udtUnited->bind(u, name, namelen); if (e.getErrorCode()) { udtUnited->setError(new (nothrow) CUDTException(e)); } else { return 0; } } return ERROR;}int CUDT::bind(UDTSOCKET u, UDPSOCKET udpsock){ CUDTUnited* udtUnited = getUDTUnited(); if (udtUnited) { CUDTException e = udtUnited->bind(u, udpsock); if (e.getErrorCode()) { udtUnited->setError(new (nothrow) CUDTException(e)); } else { return 0; } } return ERROR;}int CUDT::listen(UDTSOCKET u, int backlog){ CUDTUnited* udtUnited = getUDTUnited(); if (udtUnited) { CUDTException e = udtUnited->listen(u, backlog); if (e.getErrorCode()) { udtUnited->setError(new (nothrow) CUDTException(e)); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -