📄 tcpsocketserver.cpp
字号:
#include "TCPSocketServer.h"
#include <crtdbg.h>
TCPSocketServer::TCPSocketServer(void)
:m_hIOCP ( INVALID_HANDLE_VALUE ),
m_MaxThreads ( 0 ),
m_ConcurrentThreads ( 0 ),
m_RunningThreads ( 0 ),
m_NumThreads ( 0 ),
m_hStopEvent ( INVALID_HANDLE_VALUE ),
stop(true)
{
for(int i = 0;i<1000;i++){
PER_SOCKET_DATA *PerSocketData = new PER_SOCKET_DATA;
m_socks.push_back(PerSocketData);
}
}
TCPSocketServer::~TCPSocketServer(void){
for(int i = 0;i<1000;i++){
PER_SOCKET_DATA *PerSocketData = (PER_SOCKET_DATA*)m_socks.front();
delete PerSocketData;
m_socks.pop_front();
}
}
bool TCPSocketServer::init(){
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
m_ConcurrentThreads = sysinfo.dwNumberOfProcessors*2 + 2;
m_MaxThreads = sysinfo.dwNumberOfProcessors*3 + 2;
m_hStopEvent = CreateEvent ( NULL, TRUE, FALSE, NULL );
if ( m_hStopEvent == NULL )return false;
m_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,NULL,m_ConcurrentThreads);
if (m_hIOCP == NULL)return false;
for ( UINT i = 0; i < m_MaxThreads; ++i ){
if(!StartThread())return false;
}
return true;
}
bool TCPSocketServer::StartThread(){
HANDLE hThread = NULL;
unsigned int ThreadId = 0;
hThread = (HANDLE)_beginthreadex( NULL, 0, TCPSocketServer::ThreadProc, (LPVOID)this, 0, &ThreadId );
if ( hThread == NULL )return false;
m_ThreadMapLock.Acquire();
m_ThreadMap.insert(hash_map<DWORD,DWORD>::value_type(ThreadId,(DWORD)hThread));
m_ThreadMapLock.Release();
::InterlockedIncrement((long*)&m_NumThreads);
::InterlockedIncrement((long*)&m_RunningThreads);
return true;
}
int TCPSocketServer::Send(char* pData, size_t nLen, SOCKET s, DWORD dwParam){
if(dwParam == 0)return -1;
DWORD NumBytes;
int nRet;
PER_SOCKET_DATA *PerSocketData = (PER_SOCKET_DATA *)dwParam;
++PerSocketData->io_count;
PER_IO_OPERATION_DATA *PerIOData = new PER_IO_OPERATION_DATA;
ZeroMemory(PerIOData,sizeof(PER_IO_OPERATION_DATA));
memcpy(PerIOData->Buffer,pData,nLen);
PerIOData->DataBuf.len = nLen;
PerIOData->DataBuf.buf = PerIOData->Buffer;
PerIOData->OperationType = SEND_POSTED;
nRet = WSASend(s,&(PerIOData->DataBuf),1,&NumBytes,0,(OVERLAPPED *)PerIOData,NULL);
// ZeroMemory(PerIOData->Buffer,sizeof(PerIOData->Buffer));//just for test.
if(SOCKET_ERROR == nRet && (ERROR_IO_PENDING != WSAGetLastError())){
--PerSocketData->io_count;
closesocket(s);
delete PerIOData;
return -1;
}
return 0;
}
int TCPSocketServer::Connect(unsigned short usPort,unsigned long ulIP,SOCKET& addr_a,DWORD& addr_b){
SOCKET sck = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
if(INVALID_SOCKET == sck)return -1;
sockaddr_in sAddr;
sAddr.sin_family=AF_INET;
sAddr.sin_port=htons(usPort);
sAddr.sin_addr.S_un.S_addr=htonl(ulIP);
if ( SOCKET_ERROR == connect(sck,(sockaddr*)&sAddr,sizeof(sockaddr_in)) ) return -1;
PER_SOCKET_DATA *PerSocketData = new PER_SOCKET_DATA();
PerSocketData->socket = sck;
PerSocketData->io_count = 0;
HANDLE iocp = CreateIoCompletionPort((HANDLE)sck,m_hIOCP,(ULONG_PTR)PerSocketData,m_ConcurrentThreads);
if(iocp == NULL)return -1;
OnAccepted(sck,(DWORD)PerSocketData);
// OnConnect(sck,(DWORD)PerSocketData);
addr_a = sck;
addr_b = (DWORD)PerSocketData;
PER_IO_OPERATION_DATA* PerIOData = new PER_IO_OPERATION_DATA;
ZeroMemory(PerIOData,sizeof(PER_IO_OPERATION_DATA));
PerIOData->DataBuf.len = sizeof(PerIOData->Buffer);
PerIOData->DataBuf.buf = PerIOData->Buffer;
PerIOData->OperationType = RECV_POSTED;
DWORD Flags = 0;
DWORD RecvBytes = 0;
int nRet = WSARecv(sck,&PerIOData->DataBuf,1,&RecvBytes,&Flags,(OVERLAPPED *)PerIOData,NULL);
if (SOCKET_ERROR == nRet && (ERROR_IO_PENDING != WSAGetLastError())){
delete PerIOData;
ClearSocket(PerSocketData->socket,(DWORD)PerSocketData);
return -1;
}
return 0;
}
unsigned int TCPSocketServer::ThreadProc(void* lpParam){
TCPSocketServer &pthis=*((TCPSocketServer*)lpParam);
DWORD NumBytes = 0;
LPOVERLAPPED Overlapped = NULL;
PER_IO_OPERATION_DATA *PerIoData = NULL;
PER_SOCKET_DATA *PerSocketData = NULL;
int nRet;
DWORD RecvBytes,Flags;
while(true){
::InterlockedDecrement((long*)&(pthis.m_RunningThreads));
nRet = GetQueuedCompletionStatus(pthis.m_hIOCP,&NumBytes,(ULONG_PTR*)&PerSocketData,&Overlapped,INFINITE);
::InterlockedIncrement ((long*)&(pthis.m_RunningThreads));
if (!nRet) {
if(Overlapped != NULL){
PerIoData = (PER_IO_OPERATION_DATA *)Overlapped;
if((RECV_POSTED == PerIoData->OperationType) && (PerSocketData!=NULL)){
pthis.ClearSocket(PerSocketData->socket,(DWORD)PerSocketData);
}
delete PerIoData;
}
continue;
}else if( NumBytes == 0 ){ //nRet == ERROR_SUCCESS
PerIoData = (PER_IO_OPERATION_DATA *)Overlapped;
if((RECV_POSTED == PerIoData->OperationType) && (PerSocketData!=NULL)){
pthis.ClearSocket(PerSocketData->socket,(DWORD)PerSocketData);
}
delete PerIoData;
continue;
}
PerIoData = (PER_IO_OPERATION_DATA *)Overlapped;
if(PerIoData->OperationType == STOP_POSTED)break;
switch(PerIoData->OperationType){
case RECV_POSTED:
pthis.OnReceive(PerIoData->Buffer,NumBytes,PerSocketData->socket,(DWORD)PerSocketData);
ZeroMemory(PerIoData,sizeof(PER_IO_OPERATION_DATA));
PerIoData->DataBuf.len = sizeof(PerIoData->Buffer);
PerIoData->DataBuf.buf = PerIoData->Buffer;
PerIoData->OperationType = RECV_POSTED;
Flags = 0;
nRet = WSARecv(PerSocketData->socket,&(PerIoData->DataBuf),1,&RecvBytes,&Flags,(OVERLAPPED *)PerIoData,NULL);
if (SOCKET_ERROR == nRet && (ERROR_IO_PENDING != WSAGetLastError())){
delete PerIoData;
pthis.ClearSocket(PerSocketData->socket,(DWORD)PerSocketData);
}
break;
case SEND_POSTED:
--PerSocketData->io_count;
pthis.OnSend(PerSocketData->socket,(DWORD)PerSocketData);
delete PerIoData;
break;
}
}
pthis.ThreadCleanUp();
// _endthreadex(0);
return 0;
}
void TCPSocketServer::ThreadCleanUp(){
m_ThreadMapLock.Acquire ( );
hash_map<DWORD,DWORD>::iterator it = m_ThreadMap.find(::GetCurrentThreadId());
CloseHandle ( (HANDLE)it->second );
m_ThreadMap.erase ( it );
m_ThreadMapLock.Release ( );
::InterlockedDecrement ( (long*)&m_RunningThreads );
if ( ::InterlockedDecrement ( (long*)&m_NumThreads ) == 0 )
{
::SetEvent ( m_hStopEvent );
}
}
bool TCPSocketServer::Stop(){
hash_map<DWORD,DWORD>::iterator it;
m_ThreadMapLock.Acquire ( );
size_t numthreads = m_ThreadMap.size();
m_ThreadMapLock.Release ( );
PER_IO_OPERATION_DATA *PerIOData = new PER_IO_OPERATION_DATA;
ZeroMemory(PerIOData,sizeof(PER_IO_OPERATION_DATA));
PerIOData->DataBuf.len = sizeof(PerIOData->Buffer);
PerIOData->DataBuf.buf = PerIOData->Buffer;
PerIOData->OperationType = STOP_POSTED;
for ( ; numthreads > 0; numthreads-- ){
PostQueuedCompletionStatus(m_hIOCP,100,NULL,PerIOData);
}
if ( WaitForSingleObject(m_hStopEvent,WIOCP_ENDWAIT)!=WAIT_OBJECT_0){
m_ThreadMapLock.Acquire();
for ( it = m_ThreadMap.begin( ); it != m_ThreadMap.end( ); it++){
TerminateThread ( (HANDLE)it->second, 0 );
CloseHandle ( (HANDLE)it->second );
}
m_ThreadMapLock.Release();
}
delete PerIOData;
CloseHandle ( m_hIOCP );
m_hIOCP = INVALID_HANDLE_VALUE;
CloseHandle ( m_hStopEvent );
m_hStopEvent = INVALID_HANDLE_VALUE;
return true;
}
bool TCPSocketServer::start(unsigned short usPort,unsigned long ulIP){
int nRet;
DWORD RecvBytes,Flags;
SOCKET Accept;
PER_IO_OPERATION_DATA *PerIOData;
m_FirstSocket = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
if(INVALID_SOCKET == m_FirstSocket)return false;
sockaddr_in sAddr;
sAddr.sin_family=AF_INET;
sAddr.sin_port=htons(usPort);
sAddr.sin_addr.S_un.S_addr=htonl(ulIP);
nRet = bind(m_FirstSocket,(PSOCKADDR)&sAddr,sizeof(sAddr));
if(SOCKET_ERROR == nRet)return false;
nRet = listen(m_FirstSocket,5);
if(SOCKET_ERROR == nRet)return false;
sockaddr_in sCon;
int addr_len = sizeof(sockaddr_in);
while(stop){
Accept = WSAAccept(m_FirstSocket, (sockaddr *)&sCon, &addr_len, NULL, 0);
_RPT2(_CRT_WARN,"Someone Connected, Ip: %s, Port: %d\n", inet_ntoa(sCon.sin_addr),ntohs(sCon.sin_port));
if (SOCKET_ERROR == Accept)break;
PER_SOCKET_DATA *PerSocketData = new PER_SOCKET_DATA();
PerSocketData->socket = Accept;
PerSocketData->io_count = 0;
HANDLE iocp = CreateIoCompletionPort((HANDLE)Accept,m_hIOCP,(ULONG_PTR)PerSocketData,m_ConcurrentThreads);
if(iocp == NULL)break;
OnAccepted(Accept,(DWORD)PerSocketData);
PerIOData = new PER_IO_OPERATION_DATA;
ZeroMemory(PerIOData,sizeof(PER_IO_OPERATION_DATA));
PerIOData->DataBuf.len = sizeof(PerIOData->Buffer);
PerIOData->DataBuf.buf = PerIOData->Buffer;
PerIOData->OperationType = RECV_POSTED;
Flags = 0;
nRet = WSARecv(Accept,&PerIOData->DataBuf,1,&RecvBytes,&Flags,(OVERLAPPED *)PerIOData,NULL);
if (SOCKET_ERROR == nRet && (ERROR_IO_PENDING != WSAGetLastError())){
delete PerIOData;
ClearSocket(PerSocketData->socket,(DWORD)PerSocketData);
}
}
Stop();
return true;
}
void TCPSocketServer::ClearSocket(SOCKET s, DWORD dwParam){
OnClosed(s,dwParam);
closesocket(s);
//当前没用的sock放到list中,然后删除第一个,冗余
PER_SOCKET_DATA *PerSocketData = (PER_SOCKET_DATA*)dwParam;
m_socks.push_back(PerSocketData);
PerSocketData = (PER_SOCKET_DATA*)m_socks.front();
delete PerSocketData;
m_socks.pop_front();
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -