📄 iocpmanager.cpp
字号:
// IocpManager.cpp: implementation of the CIocpManager class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "IocpManager.h"
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
CIocpManager::CIocpManager()
{
m_hCompletionPort = NULL;
m_soListen = INVALID_SOCKET;
m_nNumberOfAcceptEx = 0;
m_lIocpWorkerCount = 0;
m_lpAcceptEx = NULL;
m_lpGetAcceptExSockAddrs = NULL;
m_bIocpStarted = FALSE;
m_nListenPortNumber = 0;
m_lTaskProcessCount = 0;
m_hStopEvent = NULL;
m_nReaderCount = 0;
InitConfig();
m_itemIocp.GetAttr(XML_IOCPMANAGER_DEFAULT_PATH, m_strDefaultPath);
if (m_strDefaultPath.IsEmpty())
m_strDefaultPath = CPublicFunc::GetFullTempPath();
WSADATA wsaData;
m_nWSAStartupRet = WSAStartup(MAKEWORD(2,2), &wsaData);
}
CIocpManager::~CIocpManager()
{
Stop();
if (NOERROROCCUR(m_nWSAStartupRet))
WSACleanup();
}
int CIocpManager::InitConfig()
{
CString strErrorXml=_T("ZNetConfig.xml");
CString strXmlPath=CPublicFunc::GetFullPathOfFile(strErrorXml);
CXmlManager XmlMngr;
int nErr=XmlMngr.Load(strXmlPath);
if (NOERROROCCUR(nErr))
nErr=XmlMngr.GetItem(_T("IocpConfig"), m_itemIocp);
return nErr;
}
void CIocpManager::CloseSocket(SOCKET& soSocket)
{
if (soSocket==INVALID_SOCKET) return;
//如果要关闭连接,将套接字上的linger值设为0
LINGER lingerStruct;
lingerStruct.l_onoff = 1;
lingerStruct.l_linger = 0;
setsockopt(soSocket, SOL_SOCKET, SO_LINGER,
(char *)&lingerStruct, sizeof(lingerStruct));
CancelIo((HANDLE)soSocket);
closesocket(soSocket);
soSocket = INVALID_SOCKET;
}
int CIocpManager::Start(int nPortNumber)
{
int nErr=ERROR_NONE;
if (m_bIocpStarted)
{
nErr = ERROR_IOCPMANAGER_IOCP_STARTED;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
if (!NOERROROCCUR(m_nWSAStartupRet))
{
nErr = ERROR_IOCPMANAGER_WSASTARTUP;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
nErr = CreateCompletionPort();
if (!NOERROROCCUR(nErr)) return nErr;
nErr = CreateIocpWorkerThread();
if (!NOERROROCCUR(nErr)) return nErr;
nErr = CreateTaskProcessThread();
if (!NOERROROCCUR(nErr)) return nErr;
nErr = CreateListenPort(nPortNumber);
if (!NOERROROCCUR(nErr)) return nErr;
m_bIocpStarted = TRUE;
Logging2(ERROR_IOCPMANAGER_START_SUCCESS, DOMAIN_NAME);
return nErr;
}
int CIocpManager::Stop()
{
int nErr=ERROR_NONE;
if (!m_bIocpStarted)
{
nErr = ERROR_IOCPMANAGER_IOCP_NOT_STARTED;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
nErr = DisconnectAllContext();
nErr = StopTaskProcessThread();
nErr = StopIocpWorkerThread();
if (m_hCompletionPort!=NULL)
{
CloseHandle(m_hCompletionPort);
m_hCompletionPort = NULL;
}
if (m_soListen!=INVALID_SOCKET)
{
closesocket(m_soListen);
m_soListen = INVALID_SOCKET;
}
FreeContext();
FreeBuffer();
m_bIocpStarted = FALSE;
Logging2(ERROR_IOCPMANAGER_STOP_SUCCESS, DOMAIN_NAME);
return nErr;
}
int CIocpManager::CreateCompletionPort()
{
int nErr=ERROR_NONE;
int nNumberOfConcurrentPerCPU=NUMBER_OF_CONCURRENT_PER_CPU;
//获得CPU个数
SYSTEM_INFO sysInfo;
WSADATA wsaData;
ZeroMemory(&sysInfo,sizeof(SYSTEM_INFO));
ZeroMemory(&wsaData,sizeof(WSADATA));
GetSystemInfo(&sysInfo);
int nNumberOfConcurrent = sysInfo.dwNumberOfProcessors * nNumberOfConcurrentPerCPU;
m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, nNumberOfConcurrent);
if (m_hCompletionPort==NULL)
{
nErr = ERROR_IOCPMANAGER_CREATE_IO_COMPLETION_PORT;
Logging2(nErr, DOMAIN_NAME);
}
return nErr;
}
int CIocpManager::CreateListenPort(int nListenPort)
{
int nErr=ERROR_NONE;
m_nListenPortNumber = nListenPort;
if (m_nListenPortNumber<=0) return nErr; //若是客户端不需要创建监听端口
m_soListen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (m_soListen==INVALID_SOCKET)
{
nErr = ERROR_IOCPMANAGER_CREATE_SOCKET;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
nErr = AssociateSocketWithCompletionPort(m_soListen, m_hCompletionPort, NULL);
if (!NOERROROCCUR(nErr)) return nErr;
SOCKADDR_IN saServer;
saServer.sin_port = htons(m_nListenPortNumber);
saServer.sin_family = AF_INET;
saServer.sin_addr.s_addr = INADDR_ANY;
int nRet = bind(m_soListen, (LPSOCKADDR)&saServer, sizeof(struct sockaddr));
if ( nRet == SOCKET_ERROR )
{
nErr = ERROR_IOCPMANAGER_BIND_LISTEN_SOCKET;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
int nMaxListenPengingConnection=MAX_LISTEN_PENDING_CONNECTION;
nRet=listen(m_soListen, nMaxListenPengingConnection);
if ( nRet == SOCKET_ERROR )
{
nErr = ERROR_IOCPMANAGER_LISTEN_LISTEN_SOCKET;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
//获得AcceptEx函数指针
DWORD dwRet=0;
static GUID guidAcceptEx=WSAID_ACCEPTEX;
nRet=WSAIoctl(m_soListen, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidAcceptEx, sizeof(guidAcceptEx),
&m_lpAcceptEx, sizeof(m_lpAcceptEx),
&dwRet, NULL, NULL);
if (nRet==SOCKET_ERROR)
{
nErr = ERROR_IOCPMANAGER_WSAIOCTL_ACCEPTEX;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
static GUID guidGetAcceptExSockAddrs=WSAID_GETACCEPTEXSOCKADDRS;
nRet=WSAIoctl(m_soListen, SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidGetAcceptExSockAddrs, sizeof(guidGetAcceptExSockAddrs),
&m_lpGetAcceptExSockAddrs, sizeof(m_lpGetAcceptExSockAddrs),
&dwRet, NULL, NULL);
if (nRet==SOCKET_ERROR)
{
nErr = ERROR_IOCPMANAGER_WSAIOCTL_GETACCEPTEXSOCKADDRS;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
return CreateAcceptEx();
}
int CIocpManager::CreateAcceptEx()
{
int nErr=ERROR_NONE;
int nNumberOfAcceptEx=NUMBER_OF_ACCEPTEX;
for (int i=0; i<nNumberOfAcceptEx; i++)
{
nErr = PostAcceptEx();
}
return nErr;
}
int CIocpManager::PostAcceptEx()
{
int nErr=ERROR_NONE;
SOCKET soSocket=WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (soSocket==INVALID_SOCKET)
{
nErr = ERROR_IOCPMANAGER_CREATE_SOCKET;
return nErr;
}
CIocpBuffer* pBuf=AllocateBuffer(IOTYPE_ACCEPTEX);
if (pBuf==NULL)
{
closesocket(soSocket);
nErr = ERROR_IOCPMANAGER_ALLOCATE_BUFFER;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
pBuf->m_soSocket = soSocket;
int nWSARet=0;
DWORD dwBytes=0;
BOOL bRet=m_lpAcceptEx(m_soListen, soSocket, pBuf->GetBuffer(), 0,
sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, &dwBytes, &pBuf->m_olBuffer);
if (!bRet && (nWSARet=WSAGetLastError())!=WSA_IO_PENDING)
{
closesocket(soSocket);
ReleaseBuffer(pBuf);
nErr = ERROR_IOCPMANAGER_ACCEPTEX_FAILED;
Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(nWSARet),
DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);
}
else
{
InterlockedIncrement(&m_nNumberOfAcceptEx);
}
return nErr;
}
int CIocpManager::AssociateSocketWithCompletionPort(SOCKET socket, HANDLE hCompletionPort, DWORD dwCompletionKey)
{
int nErr=ERROR_NONE;
HANDLE hRet=CreateIoCompletionPort((HANDLE) socket, hCompletionPort, dwCompletionKey, 0);
if (hRet!=hCompletionPort)
{
nErr = ERROR_IOCPMANAGER_ASSOCIATE_COMPLETION_PORT;
Logging2(nErr, DOMAIN_NAME);
}
return nErr;
}
int CIocpManager::CreateIocpWorkerThread()
{
int nErr=ERROR_NONE;
int nNumberOfIocpWorkerThread=NUMBER_OF_IOCP_WORKER_THREAD;
DWORD dwThreadId=0;
for(int i=0; i<nNumberOfIocpWorkerThread;i++)
{
HANDLE hThread=CreateThread(NULL, 0, CIocpManager::IocpWorkerThread,
(LPVOID)this, THREAD_PRIORITY_NORMAL, &dwThreadId);
if (hThread==NULL)
{
nErr = ERROR_IOCPMANAGER_IOCP_WORKER_THREAD_CREATE;
Logging2(nErr, DOMAIN_NAME);
break;
}
CloseHandle(hThread);
}
return nErr;
}
DWORD CIocpManager::IocpWorkerThread(LPVOID lpParam)
{
int nErr=ERROR_NONE;
CIocpManager* pThis=reinterpret_cast<CIocpManager*>(lpParam);
if (pThis==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
InterlockedIncrement(&pThis->m_lIocpWorkerCount);
DWORD dwIoSize=0;
CIocpContext* lpIocpContext=NULL;
CIocpBuffer* pOverlappedBuf=NULL;
HANDLE hCompletionPort=pThis->m_hCompletionPort;
LPOVERLAPPED lpOverlapped=NULL;
BOOL bError=FALSE;
while (!bError)
{
lpIocpContext = NULL;
pOverlappedBuf = NULL;
//获取一个请求
BOOL bIoRet=GetQueuedCompletionStatus(hCompletionPort,
&dwIoSize, (LPDWORD)&lpIocpContext, &lpOverlapped, INFINITE);
//获取请求失败
if (!bIoRet)
{
DWORD dwIoError=GetLastError();
if (dwIoError!=WAIT_TIMEOUT)
{
nErr = ERROR_IOCPMANAGER_GETQUEUEDCOMPLETIONSTATUS;
Logging2(nErr, DOMAIN_NAME);
Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(dwIoError),
DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);
if (lpIocpContext!=NULL)
{
//当连接套接字被取消并且有WSASend/WSARead请求未完成,
//则I/O请求取消并且返回ERROR_NETNAME_DELETED
if(dwIoError==ERROR_NETNAME_DELETED)
{
nErr = ERROR_IOCPMANAGER_NET_NAME_DELETED;
pThis->DisconnectContext(lpIocpContext);
pThis->ReleaseContext(lpIocpContext);
}
else
{
pThis->DisconnectContext(lpIocpContext);
pThis->ReleaseContext(lpIocpContext);
}
// Clear the buffer if returned.
pOverlappedBuf=NULL;
if(lpOverlapped!=NULL)
pOverlappedBuf=CONTAINING_RECORD(lpOverlapped, CIocpBuffer, m_olBuffer);
if(pOverlappedBuf!=NULL)
pThis->ReleaseBuffer(pOverlappedBuf);
continue;
}
// GetQueuedCompletionStatus发生错误退出线程
nErr = ERROR_IOCPMANAGER_IN_GETQUEUEDCOMPLETIONSTATUS;
bError = TRUE;
pOverlappedBuf=NULL;
if(lpOverlapped!=NULL)
pOverlappedBuf=CONTAINING_RECORD(lpOverlapped, CIocpBuffer, m_olBuffer);
if(pOverlappedBuf!=NULL)
pThis->ReleaseBuffer(pOverlappedBuf);
continue;
}
}
if(bIoRet && lpOverlapped!=NULL)
{
pOverlappedBuf=CONTAINING_RECORD(lpOverlapped, CIocpBuffer, m_olBuffer);
if(pOverlappedBuf!=NULL)
nErr = pThis->ProcessIocpMessage(lpIocpContext, pOverlappedBuf, dwIoSize);
}
if(lpIocpContext==NULL && pOverlappedBuf==NULL)
{
nErr = ERROR_IOCPMANAGER_CONTEXT_BUFFER_NULL;
Logging2(nErr, DOMAIN_NAME);
bError = TRUE;
}
}
InterlockedDecrement(&pThis->m_lIocpWorkerCount);
Logging2(ERROR_IOCPMANAGER_IOCP_WORKER_THREAD_STOP, DOMAIN_NAME);
return nErr;
}
int CIocpManager::DisconnectContext(CIocpContext* pIocpContext)
{
int nErr=ERROR_NONE;
if (pIocpContext==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
//如果套接字有效则关闭
pIocpContext->m_csContext.Lock();
if (pIocpContext->m_soContext!=INVALID_SOCKET)
{
IOCPTASK itTask;
itTask.wType = IOCPTASK_DIS_CONNECTION;
itTask.pContext = pIocpContext;
nErr = Processing(itTask);
RemoveWaitSend(pIocpContext);
RemoveTask(pIocpContext);
CloseSocket(pIocpContext->m_soContext);
}
pIocpContext->m_csContext.Unlock();
return nErr;
}
int CIocpManager::DisconnectContext(int nSocket)
{
int nErr=ERROR_NONE;
CIocpContext* pContext=FindContext(nSocket);
if (pContext==NULL)
{
nErr = ERROR_IOCPMANAGER_FIND_CONTEXT_NULL;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
return DisconnectContext(pContext);
}
int CIocpManager::ReleaseContext(CIocpContext* pIocpContext)
{
int nErr=ERROR_NONE;
if (pIocpContext==NULL)
{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -