📄 api.cpp
字号:
accepted = true; } else if (!ls->m_pUDT->m_bSynRecving) accepted = true; ReleaseMutex(ls->m_AcceptLock); if (!accepted & (CUDTSocket::LISTENING == ls->m_Status)) WaitForSingleObject(ls->m_AcceptCond, INFINITE); if (CUDTSocket::LISTENING != ls->m_Status) { SetEvent(ls->m_AcceptCond); accepted = true; } } #endif if (u == CUDT::INVALID_SOCK) { // non-blocking receiving, no connection available if (!ls->m_pUDT->m_bSynRecving) throw CUDTException(6, 2, 0); // listening socket is closed throw CUDTException(5, 6, 0); } if (AF_INET == locate(u)->m_iIPversion) *addrlen = sizeof(sockaddr_in); else *addrlen = sizeof(sockaddr_in6); // copy address information of peer node memcpy(addr, locate(u)->m_pPeerAddr, *addrlen); return u;}int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, const int& namelen){ CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); // check the size of SOCKADDR structure if (AF_INET == s->m_iIPversion) { if (namelen != sizeof(sockaddr_in)) throw CUDTException(5, 3, 0); } else { if (namelen != sizeof(sockaddr_in6)) throw CUDTException(5, 3, 0); } // a socket can "connect" only if it is in INIT or OPENED status if (CUDTSocket::INIT == s->m_Status) { if (!s->m_pUDT->m_bRendezvous) { s->m_pUDT->open(); updateMux(s->m_pUDT); s->m_Status = CUDTSocket::OPENED; } else throw CUDTException(5, 8, 0); } else if (CUDTSocket::OPENED != s->m_Status) throw CUDTException(5, 2, 0); s->m_pUDT->connect(name); s->m_Status = CUDTSocket::CONNECTED; // copy address information of local node s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr); // record peer address if (AF_INET == s->m_iIPversion) { s->m_pPeerAddr = (sockaddr*)(new sockaddr_in); memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in)); } else { s->m_pPeerAddr = (sockaddr*)(new sockaddr_in6); memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6)); } return 0;}int CUDTUnited::close(const UDTSOCKET u){ CUDTSocket* s = locate(u); // silently drop a request to close an invalid ID, rather than return error if (NULL == s) return 0; s->m_pUDT->close(); // a socket will not be immediated removed when it is closed // in order to prevent other methods from accessing invalid address // a timer is started and the socket will be removed after approximately 1 second s->m_TimeStamp = CTimer::getTime(); CUDTSocket::UDTSTATUS os = s->m_Status; // synchronize with garbage collection. #ifndef WIN32 pthread_mutex_lock(&m_ControlLock); #else WaitForSingleObject(m_ControlLock, INFINITE); #endif s->m_Status = CUDTSocket::CLOSED; #ifndef WIN32 pthread_mutex_unlock(&m_ControlLock); #else ReleaseMutex(m_ControlLock); #endif // broadcast all "accept" waiting if (CUDTSocket::LISTENING == os) { #ifndef WIN32 pthread_mutex_lock(&(s->m_AcceptLock)); pthread_mutex_unlock(&(s->m_AcceptLock)); pthread_cond_broadcast(&(s->m_AcceptCond)); #else SetEvent(s->m_AcceptCond); #endif } CTimer::triggerEvent(); return 0;}int CUDTUnited::getpeername(const UDTSOCKET u, sockaddr* name, int* namelen){ if (CUDTSocket::CONNECTED != getStatus(u)) throw CUDTException(2, 2, 0); CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); if (!s->m_pUDT->m_bConnected || s->m_pUDT->m_bBroken) throw CUDTException(2, 2, 0); if (AF_INET == s->m_iIPversion) *namelen = sizeof(sockaddr_in); else *namelen = sizeof(sockaddr_in6); // copy address information of peer node memcpy(name, s->m_pPeerAddr, *namelen); return 0;}int CUDTUnited::getsockname(const UDTSOCKET u, sockaddr* name, int* namelen){ CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); if (CUDTSocket::INIT == s->m_Status) throw CUDTException(2, 2, 0); if (AF_INET == s->m_iIPversion) *namelen = sizeof(sockaddr_in); else *namelen = sizeof(sockaddr_in6); // copy address information of local node memcpy(name, s->m_pSelfAddr, *namelen); return 0;}int CUDTUnited::select(ud_set* readfds, ud_set* writefds, ud_set* exceptfds, const timeval* timeout){ uint64_t entertime = CTimer::getTime(); uint64_t to; if (NULL == timeout) to = 0xFFFFFFFFFFFFFFFFULL; else to = timeout->tv_sec * 1000000 + timeout->tv_usec; // initialize results int count = 0; set<UDTSOCKET> rs, ws, es; // retrieve related UDT sockets vector<CUDTSocket*> ru, wu, eu; CUDTSocket* s; if (NULL != readfds) for (set<UDTSOCKET>::iterator i1 = readfds->begin(); i1 != readfds->end(); ++ i1) { if (CUDTSocket::BROKEN == getStatus(*i1)) { rs.insert(*i1); ++ count; } else if (NULL == (s = locate(*i1))) throw CUDTException(5, 4, 0); else ru.insert(ru.end(), s); } if (NULL != writefds) for (set<UDTSOCKET>::iterator i2 = writefds->begin(); i2 != writefds->end(); ++ i2) { if (CUDTSocket::BROKEN == getStatus(*i2)) { ws.insert(*i2); ++ count; } else if (NULL == (s = locate(*i2))) throw CUDTException(5, 4, 0); else wu.insert(wu.end(), s); } if (NULL != exceptfds) for (set<UDTSOCKET>::iterator i3 = exceptfds->begin(); i3 != exceptfds->end(); ++ i3) { if (CUDTSocket::BROKEN == getStatus(*i3)) { es.insert(*i3); ++ count; } else if (NULL == (s = locate(*i3))) throw CUDTException(5, 4, 0); else eu.insert(eu.end(), s); } do { // query read sockets for (vector<CUDTSocket*>::iterator j1 = ru.begin(); j1 != ru.end(); ++ j1) { s = *j1; if ((s->m_pUDT->m_bConnected && (s->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && ((s->m_pUDT->m_iSockType == UDT_STREAM) || (s->m_pUDT->m_pRcvBuffer->getRcvMsgNum() > 0))) || (!s->m_pUDT->m_bListening && (s->m_pUDT->m_bBroken || !s->m_pUDT->m_bConnected)) || (s->m_pUDT->m_bListening && (s->m_pQueuedSockets->size() > 0)) || (s->m_Status == CUDTSocket::CLOSED)) { rs.insert(s->m_SocketID); ++ count; } } // query write sockets for (vector<CUDTSocket*>::iterator j2 = wu.begin(); j2 != wu.end(); ++ j2) { s = *j2; if ((s->m_pUDT->m_bConnected && (s->m_pUDT->m_pSndBuffer->getCurrBufSize() < s->m_pUDT->m_iSndBufSize)) || s->m_pUDT->m_bBroken || !s->m_pUDT->m_bConnected || (s->m_Status == CUDTSocket::CLOSED)) { ws.insert(s->m_SocketID); ++ count; } } // query expections on sockets for (vector<CUDTSocket*>::iterator j3 = eu.begin(); j3 != eu.end(); ++ j3) { // check connection request status, not supported now } if (0 < count) break; CTimer::waitForEvent(); } while (to > CTimer::getTime() - entertime); if (NULL != readfds) *readfds = rs; if (NULL != writefds) *writefds = ws; if (NULL != exceptfds) *exceptfds = es; return count;}CUDTSocket* CUDTUnited::locate(const UDTSOCKET u){ CGuard cg(m_ControlLock); map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u); if ( (i == m_Sockets.end()) || (i->second->m_Status == CUDTSocket::CLOSED)) return NULL; return i->second;}CUDTSocket* CUDTUnited::locate(const UDTSOCKET u, const sockaddr* peer, const UDTSOCKET& id, const int32_t& isn){ CGuard cg(m_ControlLock); map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u); CGuard ag(i->second->m_AcceptLock); // look up the "peer" address in queued sockets set for (set<UDTSOCKET>::iterator j1 = i->second->m_pQueuedSockets->begin(); j1 != i->second->m_pQueuedSockets->end(); ++ j1) { map<UDTSOCKET, CUDTSocket*>::iterator k1 = m_Sockets.find(*j1); if (CIPAddress::ipcmp(peer, k1->second->m_pPeerAddr, i->second->m_iIPversion)) { if ((id == k1->second->m_PeerID) && (isn == k1->second->m_iISN)) return k1->second; } } // look up the "peer" address in accept sockets set for (set<UDTSOCKET>::iterator j2 = i->second->m_pAcceptSockets->begin(); j2 != i->second->m_pAcceptSockets->end(); ++ j2) { map<UDTSOCKET, CUDTSocket*>::iterator k2 = m_Sockets.find(*j2); if (CIPAddress::ipcmp(peer, k2->second->m_pPeerAddr, i->second->m_iIPversion)) { if ((id == k2->second->m_PeerID) && (isn == k2->second->m_iISN)) return k2->second; } } return NULL;}void CUDTUnited::checkBrokenSockets(){ CGuard cg(m_ControlLock); // set of sockets To Be Removed set<UDTSOCKET> tbr; for (map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i) { if (CUDTSocket::CLOSED != i->second->m_Status) { // check broken connection if (i->second->m_pUDT->m_bBroken) { //close broken connections and start removal timer i->second->m_Status = CUDTSocket::CLOSED; i->second->m_TimeStamp = CTimer::getTime(); // remove from listener's queue map<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket); if (ls != m_Sockets.end()) { #ifndef WIN32 pthread_mutex_lock(&(ls->second->m_AcceptLock)); #else WaitForSingleObject(ls->second->m_AcceptLock, INFINITE); #endif ls->second->m_pQueuedSockets->erase(i->second->m_SocketID); #ifndef WIN32 pthread_mutex_unlock(&(ls->second->m_AcceptLock)); #else ReleaseMutex(ls->second->m_AcceptLock); #endif } } } else { // timeout 1 second to destroy a socket AND it has been removed from RcvUList if ((CTimer::getTime() - i->second->m_TimeStamp > 1000000) && ((NULL == i->second->m_pUDT->m_pRNode) || !i->second->m_pUDT->m_pRNode->m_bOnList)) tbr.insert(i->second->m_SocketID); // sockets cannot be removed here because it will invalidate the map iterator } } // remove those timeout sockets for (set<UDTSOCKET>::iterator k = tbr.begin(); k != tbr.end(); ++ k) removeSocket(*k);}void CUDTUnited::removeSocket(const UDTSOCKET u){ map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u); // invalid socket ID if (i == m_Sockets.end()) return; // decrease multiplexer reference count, and remove it if necessary int port; if (AF_INET == i->second->m_iIPversion) port = ntohs(((sockaddr_in*)(i->second->m_pSelfAddr))->sin_port); else port = ntohs(((sockaddr_in6*)(i->second->m_pSelfAddr))->sin6_port); vector<CMultiplexer>::iterator m; for (m = m_vMultiplexer.begin(); m != m_vMultiplexer.end(); ++ m) if (port == m->m_iPort) break; if (0 != i->second->m_ListenSocket) { // if it is an accepted socket, remove it from the listener's queue map<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket); if (ls != m_Sockets.end()) { #ifndef WIN32 pthread_mutex_lock(&(ls->second->m_AcceptLock)); #else WaitForSingleObject(ls->second->m_AcceptLock, INFINITE); #endif ls->second->m_pAcceptSockets->erase(u); #ifndef WIN32 pthread_mutex_unlock(&(ls->second->m_AcceptLock)); #else ReleaseMutex(ls->second->m_AcceptLock); #endif } } else if (NULL != i->second->m_pQueuedSockets) { #ifndef WIN32 pthread_mutex_lock(&(i->second->m_AcceptLock)); #else WaitForSingleObject(i->second->m_AcceptLock, INFINITE); #endif // if it is a listener, remove all un-accepted sockets in its queue for (set<UDTSOCKET>::iterator q = i->second->m_pQueuedSockets->begin(); q != i->second->m_pQueuedSockets->end(); ++ q) { m_Sockets[*q]->m_pUDT->close(); delete m_Sockets[*q]; m_Sockets.erase(*q); if (m != m_vMultiplexer.end()) m->m_iRefCount --; } #ifndef WIN32 pthread_mutex_unlock(&(i->second->m_AcceptLock)); #else ReleaseMutex(i->second->m_AcceptLock); #endif } // delete this one m_Sockets[u]->m_pUDT->close(); delete m_Sockets[u]; m_Sockets.erase(u); if (m == m_vMultiplexer.end()) return; m->m_iRefCount --; if (0 == m->m_iRefCount) { m->m_pChannel->close(); delete m->m_pSndQueue; delete m->m_pRcvQueue; delete m->m_pTimer; delete m->m_pChannel; m_vMultiplexer.erase(m); }}void CUDTUnited::setError(CUDTException* e){ #ifndef WIN32 delete (CUDTException*)pthread_getspecific(m_TLSError); pthread_setspecific(m_TLSError, e); #else delete (CUDTException*)TlsGetValue(m_TLSError); TlsSetValue(m_TLSError, e); #endif}CUDTException* CUDTUnited::getError(){ #ifndef WIN32 if(NULL == pthread_getspecific(m_TLSError)) pthread_setspecific(m_TLSError, new CUDTException); return (CUDTException*)pthread_getspecific(m_TLSError); #else if(NULL == TlsGetValue(m_TLSError)) TlsSetValue(m_TLSError, new CUDTException); return (CUDTException*)TlsGetValue(m_TLSError); #endif}void CUDTUnited::updateMux(CUDT* u, const sockaddr* addr){ CGuard cg(m_ControlLock); if (u->m_bReuseAddr) { int port = 0; if (NULL != addr) port = (AF_INET == u->m_iIPversion) ? ntohs(((sockaddr_in*)addr)->sin_port) : ntohs(((sockaddr_in6*)addr)->sin6_port); // find a reusable address for (vector<CMultiplexer>::iterator i = m_vMultiplexer.begin(); i != m_vMultiplexer.end(); ++ i) { if ((i->m_iIPversion == u->m_iIPversion) && (i->m_iMSS == u->m_iMSS) && i->m_bReusable) { if ((0 == port) || (i->m_iPort == port)) { // reuse the existing multiplexer ++ i->m_iRefCount; u->m_pSndQueue = i->m_pSndQueue; u->m_pRcvQueue = i->m_pRcvQueue; return; } } } } // a new multiplexer is needed CMultiplexer m; m.m_iMSS = u->m_iMSS; m.m_iIPversion = u->m_iIPversion; m.m_iRefCount = 1; m.m_bReusable = u->m_bReuseAddr; m.m_pChannel = new CChannel(u->m_iIPversion); m.m_pChannel->setSndBufSize(u->m_iUDPSndBufSize); m.m_pChannel->setRcvBufSize(u->m_iUDPRcvBufSize); try { m.m_pChannel->open(addr); } catch (CUDTException& e) { m.m_pChannel->close(); delete m.m_pChannel; throw e; } sockaddr* sa = (AF_INET == u->m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6; m.m_pChannel->getSockAddr(sa); m.m_iPort = (AF_INET == u->m_iIPversion) ? ntohs(((sockaddr_in*)sa)->sin_port) : ntohs(((sockaddr_in6*)sa)->sin6_port); if (AF_INET == u->m_iIPversion) delete (sockaddr_in*)sa; else delete (sockaddr_in6*)sa; m.m_pTimer = new CTimer; m.m_pSndQueue = new CSndQueue; m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer); m.m_pRcvQueue = new CRcvQueue; m.m_pRcvQueue->init((m.m_iMSS > 1500) ? 32 : 128, u->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer); m_vMultiplexer.insert(m_vMultiplexer.end(), m); u->m_pSndQueue = m.m_pSndQueue; u->m_pRcvQueue = m.m_pRcvQueue;}void CUDTUnited::updateMux(CUDT* u, const CUDTSocket* ls){ CGuard cg(m_ControlLock); int port = (AF_INET == ls->m_iIPversion) ? ntohs(((sockaddr_in*)ls->m_pSelfAddr)->sin_port) : ntohs(((sockaddr_in6*)ls->m_pSelfAddr)->sin6_port); // find the listener's address for (vector<CMultiplexer>::iterator i = m_vMultiplexer.begin(); i != m_vMultiplexer.end(); ++ i) { if (i->m_iPort == port) { // reuse the existing multiplexer ++ i->m_iRefCount;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -