📄 gsocket.cpp
字号:
CloseSocket(s); m_pConnectionsLock->Unlock(); ReduceConnectionList(); } else m_pConnectionsLock->Unlock();}bool GSocketServerBase::Send(const unsigned char *pBuff, int len, int nConnectionNumber){ SOCKET s; { GSpinLockHolder hLock(m_pConnectionsLock, "GSocketServerBase::Send"); if(nConnectionNumber < 0 || nConnectionNumber >= m_pConnections->GetSize()) return false; s = m_pConnections->GetSocket(nConnectionNumber); } if(s == SOCKET_ERROR) return false; if(send(s, (const char*)pBuff, len, 0) == SOCKET_ERROR) {#ifdef WIN32 int n = WSAGetLastError(); switch(n) { case WSAECONNABORTED: break; case WSAECONNRESET: break; default: gsocket_LogError(); break; }#endif // WIN32 return false; } return true;}void GSocketServerBase::OnCloseConnection(int nConnection){}void GSocketServerBase::OnAcceptConnection(int nConnection){}bool GSocketServerBase::IsConnected(int nConnectionNumber){ GAssert(false, "Not implemented yet"); /* if(nConnectionNumber == 0) { if(m_hWorkerThread == BAD_HANDLE) return false; else return true; } else { if(m_pHostListenThreads->GetSize() < nConnectionNumber) return false; if(m_pHostListenThreads->GetHandle(nConnectionNumber - 1) == BAD_HANDLE) return false; else return true; } */ return false;}SOCKET GSocketServerBase::GetSocketHandle(int nConnectionNumber){ if(nConnectionNumber < 0) return m_socketConnectionListener; else return m_pConnections->GetSocket(nConnectionNumber);}in_addr GSocketServerBase::GetTheirIPAddr(int nConnectionNumber){ struct sockaddr sAddr; socklen_t l; l = sizeof(SOCKADDR); if(nConnectionNumber == 0) { if(getpeername(m_socketConnectionListener, &sAddr, &l)) gsocket_LogError(); } else { if(getpeername(m_pConnections->GetSocket(nConnectionNumber), &sAddr, &l)) gsocket_LogError(); } if(sAddr.sa_family != AF_INET) GAssert(false, "Error, family is not AF_INET\n"); SOCKADDR_IN* pInfo = (SOCKADDR_IN*)&sAddr; return pInfo->sin_addr;}char* GSocketServerBase::GetTheirIPAddr(char* szBuff, int nBuffSize, int nConnectionNumber){ GString::StrCpy(szBuff, inet_ntoa(GetTheirIPAddr(nConnectionNumber)), nBuffSize); return szBuff;}u_short GSocketServerBase::GetTheirPort(int nConnectionNumber){ SOCKADDR sAddr; socklen_t l; l = sizeof(SOCKADDR); if(nConnectionNumber == 0) { if(getpeername(m_socketConnectionListener, &sAddr, &l)) gsocket_LogError(); } else { if(getpeername(m_pConnections->GetSocket(nConnectionNumber), &sAddr, &l)) gsocket_LogError(); } if(sAddr.sa_family != AF_INET) GAssert(false, "Error, family is not AF_INET\n"); SOCKADDR_IN* pInfo = (SOCKADDR_IN*)&sAddr; return htons(pInfo->sin_port);}char* GSocketServerBase::GetTheirName(char* szBuff, int nBuffSize, int nConnectionNumber){ SOCKADDR sAddr; socklen_t l; l = sizeof(SOCKADDR); if(nConnectionNumber == 0) { if(getpeername(m_socketConnectionListener, &sAddr, &l)) gsocket_LogError(); } else { if(getpeername(m_pConnections->GetSocket(nConnectionNumber), &sAddr, &l)) gsocket_LogError(); } if(sAddr.sa_family != AF_INET) GAssert(false, "Error, family is not AF_INET\n"); SOCKADDR_IN* pInfo = (SOCKADDR_IN*)&sAddr; HOSTENT* namestruct = gethostbyaddr((const char*)&pInfo->sin_addr, 4, pInfo->sin_family); if(!namestruct) { GAssert(false, "Error calling gethostbyaddr\n"); } GString::StrCpy(szBuff, namestruct->h_name, nBuffSize); return(szBuff);}// --------------------------------------------------------------------------const char GSocketTag[] = "GSKT";struct GSocketServerBuffer{public: unsigned char* m_pBuffer; int m_nBufferPos; GSocketServerBuffer(int nMaxPacketSize) { m_pBuffer = new unsigned char[nMaxPacketSize + sizeof(struct GEZSocketPacketHeader)]; m_nBufferPos = 0; } ~GSocketServerBuffer() { delete(m_pBuffer); }};// if nMaxPacketSize = 0, the socket will be compatible with TCP sockets.// if nMaxPacketSize > 0, it will use it's own protocol that guarantees// same-size delivery of packets, but has a maximum packet size.GSocketServer::GSocketServer(bool bUDP, int nMaxPacketSize, int nPort, int nMaxConnections) : GSocketServerBase(bUDP, nPort, nMaxConnections){ GAssert(sizeof(struct GEZSocketPacketHeader) == 8, "packing issue"); m_nMaxPacketSize = nMaxPacketSize; if(nMaxPacketSize > 0) m_pBuffers = new GPointerArray(16); else m_pBuffers = NULL; m_pMessageQueueLock = new GSpinLock(); m_pMessageQueue = new GPointerQueue();}GSocketServer::~GSocketServer(){ // Join the worker thread now it doesn't try // to queue up a message after we delete the // message queue JoinWorkerThread(); if(m_pBuffers) { int nCount = m_pBuffers->GetSize(); int n; for(n = 0; n < nCount; n++) delete((GSocketServerBuffer*)m_pBuffers->GetPointer(n)); delete(m_pBuffers); } delete(m_pMessageQueue); delete(m_pMessageQueueLock);}/*static*/ GSocketServer* GSocketServer::HostTCPSocket(int nPort){ GSocketServer* pSocket = new GSocketServer(false, 0, nPort, 1000); if(!pSocket) return NULL; return pSocket;}/*static*/ GSocketServer* GSocketServer::HostUDPSocket(int nPort){ GSocketServer* pSocket = new GSocketServer(true, 0, nPort, 1000); if(!pSocket) return NULL; return pSocket;}/*static*/ GSocketServer* GSocketServer::HostGashSocket(int nPort, int nMaxPacketSize){ GSocketServer* pSocket = new GSocketServer(false, nMaxPacketSize, nPort, 1000); if(!pSocket) return NULL; return pSocket;}void GSocketServer::QueueMessage(unsigned char* pBuf, int nLen, int nConnectionNumber){ m_pMessageQueueLock->Lock("GSocketServer::QueueMessage"); GSocketMessage* pNewMessage = new GSocketMessage(pBuf, nLen, nConnectionNumber); m_pMessageQueue->Push(pNewMessage); m_pMessageQueueLock->Unlock();}int GSocketServer::GetMessageCount(){/* int nSize; m_pMessageQueueLock->Lock("GSocketServer::GetMessageCount"); nSize = m_pMessageQueue->GetSize(); m_pMessageQueueLock->Unlock(); return nSize;*/ return m_pMessageQueue->GetSize();}unsigned char* GSocketServer::GetNextMessage(int* pnSize, int* pnOutConnectionNumber){ m_pMessageQueueLock->Lock("GSocketClient::GetNextMessage"); if(m_pMessageQueue->GetSize() <= 0) { m_pMessageQueueLock->Unlock(); *pnOutConnectionNumber = -1; return NULL; } GSocketMessage* pMessage = (GSocketMessage*)m_pMessageQueue->Pop(); m_pMessageQueueLock->Unlock(); *pnSize = pMessage->GetMessageSize(); *pnOutConnectionNumber = pMessage->GetConnection(); unsigned char* pBuf = pMessage->TakeBuffer(); delete(pMessage); return pBuf;}bool GSocketServer::Receive(unsigned char *pBuf, int nLen, int nConnectionNumber){//fprintf(stderr, "Received %d bytes from %d {%c%c...%c%c}\n", nLen, nConnectionNumber, pBuf[0], pBuf[1], pBuf[nLen - 2], pBuf[nLen - 1]);//fflush(stderr); if(m_nMaxPacketSize == 0) { QueueMessage(pBuf, nLen, nConnectionNumber); } else { while(nConnectionNumber >= m_pBuffers->GetSize()) m_pBuffers->AddPointer(new GSocketServerBuffer(m_nMaxPacketSize)); GSocketServerBuffer* pBuffer = (GSocketServerBuffer*)m_pBuffers->GetPointer(nConnectionNumber); while(nLen > 0) { if(pBuffer->m_nBufferPos == 0 && nLen >= (int)sizeof(struct GEZSocketPacketHeader) && nLen >= (int)sizeof(struct GEZSocketPacketHeader) + GBits::LittleEndianToN32(((struct GEZSocketPacketHeader*)pBuf)->nPayloadSize)) { // We've got a whole packet, so just queue it up GAssert(*(unsigned int*)pBuf == *(unsigned int*)GSocketTag, "Bad Packet"); int nSize = GBits::LittleEndianToN32(((struct GEZSocketPacketHeader*)pBuf)->nPayloadSize); pBuf += sizeof(struct GEZSocketPacketHeader); nLen -= sizeof(struct GEZSocketPacketHeader); QueueMessage(pBuf, nSize, nConnectionNumber); pBuf += nSize; nLen -= nSize; } else { // We've only got a partial packet, so we need to buffer it while(pBuffer->m_nBufferPos < (int)sizeof(struct GEZSocketPacketHeader) && nLen > 0) { pBuffer->m_pBuffer[pBuffer->m_nBufferPos] = *pBuf; if(pBuffer->m_nBufferPos < 4 && *pBuf != GSocketTag[pBuffer->m_nBufferPos]) { GAssert(false, "bad packet"); pBuffer->m_nBufferPos = -1; } pBuffer->m_nBufferPos++; pBuf++; nLen--; } if(pBuffer->m_nBufferPos < (int)sizeof(struct GEZSocketPacketHeader)) return true; struct GEZSocketPacketHeader* pHeader = (struct GEZSocketPacketHeader*)pBuffer->m_pBuffer; int nSize = GBits::LittleEndianToN32(pHeader->nPayloadSize); if(nSize > m_nMaxPacketSize) { GAssert(false, "Received a packet that was too big"); pHeader->nPayloadSize = m_nMaxPacketSize; } while(pBuffer->m_nBufferPos < (int)sizeof(struct GEZSocketPacketHeader) + nSize && nLen > 0) { pBuffer->m_pBuffer[pBuffer->m_nBufferPos] = *pBuf; pBuffer->m_nBufferPos++; pBuf++; nLen--; } if(pBuffer->m_nBufferPos < (int)sizeof(struct GEZSocketPacketHeader) + nSize) return true; QueueMessage(pBuffer->m_pBuffer + sizeof(struct GEZSocketPacketHeader), nSize, nConnectionNumber); pBuffer->m_nBufferPos = 0; } } } return true;}bool GSocketServer::Send(const void* pBuf, int nLen, int nConnectionNumber){ if(m_nMaxPacketSize > 0) { GAssert(nLen <= m_nMaxPacketSize, "packet too big"); struct GEZSocketPacketHeader header; header.tag[0] = GSocketTag[0]; header.tag[1] = GSocketTag[1]; header.tag[2] = GSocketTag[2]; header.tag[3] = GSocketTag[3]; header.nPayloadSize = GBits::N32ToLittleEndian(nLen); if(!GSocketServerBase::Send((const unsigned char*)&header, sizeof(struct GEZSocketPacketHeader), nConnectionNumber)) return false; } bool bRet = GSocketServerBase::Send((const unsigned char*)pBuf, nLen, nConnectionNumber); return bRet;}// --------------------------------------------------------------------------// if nMaxPacketSize = 0, the socket will be compatible with TCP sockets.// if nMaxPacketSize > 0, it will use it's own protocol that guarantees// same-size delivery of packets, but has a maximum packet size.GSocketClient::GSocketClient(bool bUDP, int nMaxPacketSize) : GSocketClientBase(bUDP){ m_nMaxPacketSize = nMaxPacketSize; if(nMaxPacketSize > 0) m_pBuffer = new unsigned char[nMaxPacketSize + sizeof(struct GEZSocketPacketHeader)]; else m_pBuffer = NULL; m_nBufferPos = 0; m_pMessageQueueLock = new GSpinLock(); m_pMessageQueue = new GPointerQueue();}GSocketClient::~GSocketClient(){ // Join the other threads now so they don't try // to queue up a message after we delete the // message queue JoinListenThread(); delete(m_pBuffer); delete(m_pMessageQueue); delete(m_pMessageQueueLock);}/*static*/ GSocketClient* GSocketClient::ConnectToTCPSocket(const char* szAddress, int nPort){ GSocketClient* pSocket = new GSocketClient(false, 0); if(!pSocket) return NULL; if(!pSocket->Connect(szAddress, nPort)) { delete(pSocket); return NULL; } return pSocket;}/*static*/ GSocketClient* GSocketClient::ConnectToUDPSocket(const char* szAddress, int nPort){ GSocketClient* pSocket = new GSocketClient(true, 0); if(!pSocket) return NULL; if(!pSocket->Connect(szAddress, nPort)) { delete(pSocket); return NULL; } return pSocket;}/*static*/ GSocketClient* GSocketClient::ConnectToGashSocket(const char* szAddress, int nPort, int nMaxPacketSize){ GSocketClient* pSocket = new GSocketClient(false, nMaxPacketSize); if(!pSocket) return NULL; if(!pSocket->Connect(szAddress, nPort)) { delete(pSocket); return NULL; } return pSocket;}void GSocketClient::QueueMessage(unsigned char* pBuf, int nLen){ m_pMessageQueueLock->Lock("GSocketClient::QueueMessage"); m_pMessageQueue->Push(new GSocketMessage(pBuf, nLen, 0)); m_pMessageQueueLock->Unlock();}int GSocketClient::GetMessageCount(){ return m_pMessageQueue->GetSize();}unsigned char* GSocketClient::GetNextMessage(int* pnSize){ if(m_pMessageQueue->GetSize() <= 0) { *pnSize = 0; return NULL; } m_pMessageQueueLock->Lock("GSocketClient::GetNextMessage"); GSocketMessage* pMessage = (GSocketMessage*)m_pMessageQueue->Pop(); m_pMessageQueueLock->Unlock(); *pnSize = pMessage->GetMessageSize(); unsigned char* pBuf = pMessage->TakeBuffer(); delete(pMessage); return pBuf;}bool GSocketClient::Receive(unsigned char *pBuf, int nLen){ if(m_nMaxPacketSize == 0) QueueMessage(pBuf, nLen); else { while(nLen > 0) { if(m_nBufferPos == 0 && nLen >= (int)sizeof(struct GEZSocketPacketHeader) && nLen >= (int)sizeof(struct GEZSocketPacketHeader) + GBits::LittleEndianToN32(((struct GEZSocketPacketHeader*)pBuf)->nPayloadSize)) { // We've got a whole packet, so just queue it up GAssert(*(unsigned int*)pBuf == *(unsigned int*)GSocketTag, "Bad Packet"); int nSize = GBits::LittleEndianToN32(((struct GEZSocketPacketHeader*)pBuf)->nPayloadSize); pBuf += sizeof(struct GEZSocketPacketHeader); nLen -= sizeof(struct GEZSocketPacketHeader); QueueMessage(pBuf, nSize); pBuf += nSize; nLen -= nSize; } else { // We've only got a partial packet, so we need to buffer it while(m_nBufferPos < (int)sizeof(struct GEZSocketPacketHeader) && nLen > 0) { m_pBuffer[m_nBufferPos] = *pBuf; if(m_nBufferPos < 4 && *pBuf != GSocketTag[m_nBufferPos]) m_nBufferPos = -1; m_nBufferPos++; pBuf++; nLen--; } if(m_nBufferPos < (int)sizeof(struct GEZSocketPacketHeader)) return true; struct GEZSocketPacketHeader* pHeader = (struct GEZSocketPacketHeader*)m_pBuffer; int nSize = GBits::LittleEndianToN32(pHeader->nPayloadSize); if(nSize > m_nMaxPacketSize) { GAssert(false, "Received a packet that was too big"); pHeader->nPayloadSize = m_nMaxPacketSize; } while(m_nBufferPos < (int)sizeof(struct GEZSocketPacketHeader) + nSize && nLen > 0) { m_pBuffer[m_nBufferPos] = *pBuf; m_nBufferPos++; pBuf++; nLen--; } if(m_nBufferPos < (int)sizeof(struct GEZSocketPacketHeader) + nSize) return true; QueueMessage(m_pBuffer + sizeof(struct GEZSocketPacketHeader), nSize); m_nBufferPos = 0; } } } return true;}bool GSocketClient::Send(const void* pBuf, int nLen){ if(m_nMaxPacketSize > 0) { GAssert(nLen <= m_nMaxPacketSize, "packet too big"); struct GEZSocketPacketHeader header; header.tag[0] = GSocketTag[0]; header.tag[1] = GSocketTag[1]; header.tag[2] = GSocketTag[2]; header.tag[3] = GSocketTag[3]; header.nPayloadSize = GBits::N32ToLittleEndian(nLen); if(!GSocketClientBase::Send((const unsigned char*)&header, sizeof(struct GEZSocketPacketHeader))) return false; }//fprintf(stderr, "Sending %d bytes {%c%c...%c%c}\n", nLen, ((char*)pBuf)[0], ((char*)pBuf)[1], ((char*)pBuf)[nLen - 2], ((char*)pBuf)[nLen - 1]);//fflush(stderr); return GSocketClientBase::Send((const unsigned char*)pBuf, nLen);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -