📄 iocpserver.cpp
字号:
#include "IocpServer.h"
#include <stdio.h>
#pragma comment(lib, "WS2_32.lib")
CIOCPServer::CIOCPServer()
{
// 列表
m_pFreeBufferList = NULL;
m_pFreeContextList = NULL;
m_pPendingAccepts = NULL;
m_pConnectionList = NULL;
m_nFreeBufferCount = 0;
m_nFreeContextCount = 0;
m_nPendingAcceptCount = 0;
m_nCurrentConnection = 0;
::InitializeCriticalSection(&m_FreeBufferListLock);
::InitializeCriticalSection(&m_FreeContextListLock);
::InitializeCriticalSection(&m_PendingAcceptsLock);
::InitializeCriticalSection(&m_ConnectionListLock);
//Accept请求
m_hAcceptEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
m_hRepostEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
m_nRepostCount = 0;
m_nPort = 4567;
m_nInitialAccepts = 10;
m_nInitialReads = 4;
m_nMAxAccepts = 100;
m_nMaxSends = 20;
m_nMaxFreeBuffers = 200;
m_nMaxFreeContexts = 100;
m_nMaxConnections = 2000;
m_hListenThread = NULL;
m_hCompletion = NULL;
m_sListen = INVALID_SOCKET;
m_lpfnAcceptEx = NULL;
m_lpfnGetAcceptExSockaddrs = NULL;
m_bShutDown = FALSE;
m_bServerStarted = FALSE;
//初始化WS2_32.dll
WSADATA wsaData;
WORD sockVersion = MAKEWORD(2, 2);
::WSAStartup(sockVersion, &wsaData);
}
CIOCPServer::~CIOCPServer()
{
Shutdown();
if (m_sListen != INVALID_SOCKET)
{
::closesocket(m_sListen);
}
if (m_hListenThread != NULL)
{
::CloseHandle(m_hListenThread);
}
::CloseHandle(m_hRepostEvent);
::CloseHandle(m_hAcceptEvent);
::DeleteCriticalSection(&m_FreeBufferListLock);
::DeleteCriticalSection(&m_FreeContextListLock);
::DeleteCriticalSection(&m_PendingAcceptsLock);
::DeleteCriticalSection(&m_ConnectionListLock);
::WSACleanup();
}
CIOCPBuffer *CIOCPServer::AllocateBuffer(int nLen)
{
CIOCPBuffer *pBuffer = NULL;
if (nLen > BUFFER_SIZE)
{
return NULL;
}
//为缓冲区对象申请内存
::EnterCriticalSection(&m_FreeBufferListLock);
if (m_pFreeBufferList == NULL) //内存池为空,申请新的内存
{
//堆内存分配
pBuffer = (CIOCPBuffer *)::HeapAlloc(GetProcessHeap(), \
HEAP_ZERO_MEMORY, sizeof(CIOCPBuffer) + BUFFER_SIZE);
}
else
{
pBuffer = m_pFreeBufferList;
m_pFreeBufferList = m_pFreeBufferList->pNext;
pBuffer->pNext = NULL;
m_nFreeBufferCount--;
}
::LeaveCriticalSection(&m_FreeBufferListLock);
//初始化新的缓冲区对象
if (pBuffer != NULL)
{
pBuffer->buff = (char*)(pBuffer+1);
pBuffer->nLen = nLen;
}
return pBuffer;
}
void CIOCPServer::ReleaseBuffer(CIOCPBuffer *pBuffer)
{
::EnterCriticalSection(&m_FreeBufferListLock);
if (m_nFreeBufferCount <= m_nMaxFreeBuffers)
{
memset(pBuffer, 0, sizeof(CIOCPBuffer)+BUFFER_SIZE);
pBuffer->pNext = m_pFreeBufferList;
m_pFreeBufferList = pBuffer;
m_nFreeBufferCount++;
}
else
{
::HeapFree(::GetProcessHeap(), 0, pBuffer);
}
::LeaveCriticalSection(&m_FreeBufferListLock);
}
CIOCPContext *CIOCPServer::AllocateContext(SOCKET s)
{
CIOCPContext *pContext = NULL;
//申请一个CIOCPContext对象
::EnterCriticalSection(&m_FreeContextListLock);
if (m_pFreeContextList == NULL)
{
pContext = (CIOCPContext*)::HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, \
sizeof(CIOCPContext));
::InitializeCriticalSection(&pContext->Lock);
}
else
{
pContext = m_pFreeContextList;
m_pFreeContextList = m_pFreeContextList->pNext;
pContext->pNext = NULL;
m_nFreeContextCount--;
}
::LeaveCriticalSection(&m_FreeContextListLock);
// 初始化对象成员
if(pContext != NULL)
{
pContext->s = s;
}
return pContext;
}
void CIOCPServer::ReleaseContext(CIOCPContext *pContext)
{
if (pContext->s != INVALID_SOCKET)
{
::closesocket(pContext->s);
}
//首先释放(如果有的话)此套接字上的没有按顺序完成的读I/O的缓冲区
CIOCPBuffer *pNext;
while (pContext->pOutOfOrderReads != NULL)
{
pNext = pContext->pOutOfOrderReads->pNext;
ReleaseBuffer(pContext->pOutOfOrderReads);
pContext->pOutOfOrderReads = pNext;
}
::EnterCriticalSection(&m_FreeContextListLock);
if (m_nFreeContextCount <= m_nMaxFreeContexts)
{
CRITICAL_SECTION cstmp = pContext->Lock; //先将关键代码段变量保存到一个临时变量中
memset(pContext, 0, sizeof(CIOCPContext)); //将要释放的上下文对象初始化为0
//再放会关键段变量,将要释放的上下文对象添加到空闲列表的表头
pContext->Lock = cstmp;
pContext->pNext = m_pFreeContextList;
m_pFreeContextList = pContext;
m_nFreeContextCount++;
}
else
{
::DeleteCriticalSection(&pContext->Lock);
::HeapFree(::GetProcessHeap(), 0, pContext);
}
::LeaveCriticalSection(&m_FreeContextListLock);
}
void CIOCPServer::FreeBuffer()
{
// 遍历m_pFreeBufferList空闲列表,释放缓冲区池内存
::EnterCriticalSection(&m_FreeBufferListLock);
CIOCPBuffer *pFreeBuffer = m_pFreeBufferList;
CIOCPBuffer *pNextBuffer;
while(pFreeBuffer != NULL)
{
pNextBuffer = pFreeBuffer->pNext;
if (!::HeapFree(::GetProcessHeap(), 0 , pFreeBuffer))
{
#ifdef _DEBUG
::OutputDebugString(" FreeBuffers释放内存出错!");
#endif
break;
}
pFreeBuffer = pNextBuffer;
}
m_pFreeBufferList = NULL;
m_nFreeBufferCount = 0;
::LeaveCriticalSection(&m_FreeBufferListLock);
}
void CIOCPServer::FreeContexts()
{
// 遍历m_pFreeContextList空闲列表,释放缓冲区池内存
::EnterCriticalSection(&m_FreeContextListLock);
CIOCPContext *pFreeContext = m_pFreeContextList;
CIOCPContext *pNextContxt;
while(pFreeContext != NULL)
{
pNextContxt = pFreeContext->pNext;
::DeleteCriticalSection(&pFreeContext->Lock);
if (!::HeapFree(::GetProcessHeap(), 0, pFreeContext))
{
#ifdef _DEBUG
::OutputDebugString(" FreeContext释放内存出错!");
#endif // _DEBUG
break;
}
pFreeContext = pNextContxt;
}
m_pFreeContextList = NULL;
m_nFreeContextCount = 0;
::LeaveCriticalSection(&m_FreeContextListLock);
}
BOOL CIOCPServer::AddAConnection(CIOCPContext *pContext)
{
//向客户连接列表添加一个CIOCPContext对象
::EnterCriticalSection(&m_ConnectionListLock);
if (m_nCurrentConnection <= m_nMaxConnections)
{
//添加到表头
pContext->pNext = m_pConnectionList;
m_pConnectionList = pContext;
m_nCurrentConnection++;
printf("++%d\n", m_nCurrentConnection);
::LeaveCriticalSection(&m_ConnectionListLock);
return TRUE;
}
::LeaveCriticalSection(&m_ConnectionListLock);
return FALSE;
}
void CIOCPServer::CloseAConnection(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError /* = NO_ERROR */)
{
//首先从列表中移除要关闭的连接
::EnterCriticalSection(&m_ConnectionListLock);
CIOCPContext *pTest = m_pConnectionList;
if (pTest == pContext)
{
m_pConnectionList = pContext->pNext;
m_nCurrentConnection--;
printf("--%d\n", m_nCurrentConnection);
if (nError == NO_ERROR)
{
OnConnectionClosing(pContext, pBuffer);
}
else
{
OnConnectionError(pContext, pBuffer, nError);
}
}
else
{
while(pTest!=NULL && pTest->pNext!=pContext)
{
pTest = pTest->pNext;
}
if (pTest != NULL)
{
pTest->pNext = pContext->pNext;
m_nCurrentConnection--;
printf("--%d\n", m_nCurrentConnection);
if (nError == NO_ERROR)
{
OnConnectionClosing(pContext, pBuffer);
}
else
{
OnConnectionError(pContext, pBuffer, nError);
}
}
}
::LeaveCriticalSection(&m_ConnectionListLock);
// 然后关闭客户套节字
::EnterCriticalSection(&pContext->Lock);
if(pContext->s != INVALID_SOCKET)
{
::closesocket(pContext->s);
pContext->s = INVALID_SOCKET;
}
pContext->bClosing = TRUE;
::LeaveCriticalSection(&pContext->Lock);
}
void CIOCPServer::CloseAllConnections()
{
// 遍历整个连接列表,关闭所有的客户套节字
::EnterCriticalSection(&m_ConnectionListLock);
CIOCPContext *pContext = m_pConnectionList;
while(pContext != NULL)
{
::EnterCriticalSection(&pContext->Lock);
if (pContext->s != INVALID_SOCKET)
{
::closesocket(pContext->s);
pContext->s = INVALID_SOCKET;
}
pContext->bClosing = TRUE;
::LeaveCriticalSection(&pContext->Lock);
pContext = pContext->pNext;
}
m_pConnectionList = NULL;
m_nCurrentConnection = 0;
::LeaveCriticalSection(&m_ConnectionListLock);
}
ULONG CIOCPServer::GetCurrentConnection()
{
return m_nCurrentConnection;
}
BOOL CIOCPServer::InsertPendingAccept(CIOCPBuffer *pBuffer)
{
// 将一个I/O缓冲区对象插入到m_pPendingAccepts表中
::EnterCriticalSection(&m_PendingAcceptsLock);
if (m_pPendingAccepts == NULL)
{
m_pPendingAccepts = pBuffer;
}
else
{
pBuffer->pNext = m_pPendingAccepts;
m_pPendingAccepts = pBuffer;
}
m_nPendingAcceptCount++;
::LeaveCriticalSection(&m_PendingAcceptsLock);
return TRUE;
}
BOOL CIOCPServer::RemovePendingAccept(CIOCPBuffer *pBuffer)
{
BOOL bResult = FALSE;
// 遍历m_pPendingAccepts表,从中移除pBuffer所指向的缓冲区对象
::EnterCriticalSection(&m_PendingAcceptsLock);
CIOCPBuffer *pTest = m_pPendingAccepts;
if (pBuffer == pTest)
{
m_pPendingAccepts = m_pPendingAccepts->pNext;
bResult = TRUE;
}
else
{
while(pTest!=NULL && pTest->pNext!= pBuffer)
{
pTest = pTest->pNext;
}
if (pTest!=NULL)
{
pTest->pNext = pBuffer->pNext;
bResult = TRUE;
}
}
// 更新计数
if(bResult)
{
m_nPendingAcceptCount--;
}
::LeaveCriticalSection(&m_PendingAcceptsLock);
return bResult;
}
CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
if (pBuffer != NULL)
{
//如果与要读的下一个序列号相等,则读这块缓冲区
if (pBuffer->nSequenceNumber == pContext->nCurrentReadSequence)
{
return pBuffer;
}
//如果不相等,则说明没按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中
//列表中的缓冲区是按照其序列号从小到大的顺序排列的
pBuffer->pNext = NULL;
CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
CIOCPBuffer *pPre = NULL;
while(ptr != NULL)
{
if (pBuffer->nSequenceNumber < ptr->nSequenceNumber)
{
break;
}
pPre = ptr;
ptr = ptr->pNext;
}
if (pPre == NULL)
{
pBuffer->pNext = pContext->pOutOfOrderReads;
pContext->pOutOfOrderReads = pBuffer;
}
else
{
pBuffer->pNext = pPre->pNext;
pPre->pNext = pBuffer->pNext;
}
}
//检查表头元素的序列号,如果要读的序列号一致,就将它从表中移除,返回给用户
CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
if (ptr!=NULL && (ptr->nSequenceNumber == pContext->nCurrentReadSequence))
{
pContext->pOutOfOrderReads = ptr->pNext;
return ptr;
}
return NULL;
}
BOOL CIOCPServer::PostAccept(CIOCPBuffer *pBuffer)
{
// 设置I/O类型
pBuffer->nOperation = OP_ACCEPT;
//投递此重叠I/O
DWORD dwBytes;
pBuffer->sClient = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
BOOL b = m_lpfnAcceptEx(m_sListen, pBuffer->sClient, pBuffer->buff, \
pBuffer->nLen-((sizeof(sockaddr_in)+16)*2), sizeof(sockaddr_in)+16, \
sizeof(sockaddr_in)+16, &dwBytes, &pBuffer->ol);
if (!b && ::WSAGetLastError() != WSA_IO_PENDING)
{
return FALSE;
}
return TRUE;
}
BOOL CIOCPServer::PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
//设置I/O类型
pBuffer->nOperation = OP_READ;
::EnterCriticalSection(&pContext->Lock);
pBuffer->nSequenceNumber = pContext->nReadSequence; //设置序列号
//投递此重叠I/O
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if (::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
{
if (::WSAGetLastError() != WSA_IO_PENDING)
{
::LeaveCriticalSection(&pContext->Lock);
return FALSE;
}
}
//增加套接上的重叠I/O计数和读序列号计数
pContext->nOutstandingRecv++;
pContext->nReadSequence++;
::LeaveCriticalSection(&pContext->Lock);
return TRUE;
}
BOOL CIOCPServer::PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
// 跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作
if (pContext->nOutstandingSend > m_nMaxSends)
{
return FALSE;
}
//设置I/O类型,增加套接字上的重叠I/O计数
pBuffer->nOperation = OP_WRITE;
//投递此重叠I/O
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = pBuffer->buff;
buf.len = pBuffer->nLen;
if (::WSASend(pContext->s, &buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
{
if (::WSAGetLastError() != WSA_IO_PENDING)
{
return FALSE;
}
}
// 增加套节字上的重叠I/O计数
::EnterCriticalSection(&pContext->Lock);
pContext->nOutstandingSend ++;
::LeaveCriticalSection(&pContext->Lock);
return TRUE;
}
BOOL CIOCPServer::Start(int nPort, int nMaxConnections, \
int nMaxFreeBuffer, int nMaxFreeContexts, \
int nInitialReads)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -