⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iocpserver.cpp

📁 使用IOCP编写的可伸缩性的回显服务器
💻 CPP
📖 第 1 页 / 共 2 页
字号:
#include "IocpServer.h"
#include <stdio.h>
#pragma comment(lib, "WS2_32.lib")

CIOCPServer::CIOCPServer()
{
	// 列表
	m_pFreeBufferList = NULL;
	m_pFreeContextList = NULL;	
	m_pPendingAccepts = NULL;
	m_pConnectionList = NULL;
	
	m_nFreeBufferCount = 0;
	m_nFreeContextCount = 0;
	m_nPendingAcceptCount = 0;
	m_nCurrentConnection = 0;
	
	::InitializeCriticalSection(&m_FreeBufferListLock);
	::InitializeCriticalSection(&m_FreeContextListLock);
	::InitializeCriticalSection(&m_PendingAcceptsLock);
	::InitializeCriticalSection(&m_ConnectionListLock);

	//Accept请求
	m_hAcceptEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
	m_hRepostEvent = ::CreateEvent(NULL, FALSE, FALSE, NULL);
	m_nRepostCount = 0;
	m_nPort = 4567;
	m_nInitialAccepts = 10;
	m_nInitialReads = 4;
	m_nMAxAccepts = 100;
	m_nMaxSends = 20;
	m_nMaxFreeBuffers = 200;
	m_nMaxFreeContexts = 100;
	m_nMaxConnections = 2000;
	m_hListenThread = NULL;
	m_hCompletion = NULL;
	m_sListen = INVALID_SOCKET;
	m_lpfnAcceptEx = NULL;
	m_lpfnGetAcceptExSockaddrs = NULL;
	m_bShutDown = FALSE;
	m_bServerStarted = FALSE;

	//初始化WS2_32.dll
	WSADATA wsaData;
	WORD sockVersion = MAKEWORD(2, 2);
	::WSAStartup(sockVersion, &wsaData);
}

CIOCPServer::~CIOCPServer()
{
	Shutdown();
	if (m_sListen != INVALID_SOCKET)
	{
		::closesocket(m_sListen);
	}
	if (m_hListenThread != NULL)
	{
		::CloseHandle(m_hListenThread);
	}
	::CloseHandle(m_hRepostEvent);
	::CloseHandle(m_hAcceptEvent);
	::DeleteCriticalSection(&m_FreeBufferListLock);
	::DeleteCriticalSection(&m_FreeContextListLock);
	::DeleteCriticalSection(&m_PendingAcceptsLock);
	::DeleteCriticalSection(&m_ConnectionListLock);
	::WSACleanup();
}

CIOCPBuffer *CIOCPServer::AllocateBuffer(int nLen)
{
	CIOCPBuffer *pBuffer = NULL;
	if (nLen > BUFFER_SIZE)
	{
		return NULL;
	}
	//为缓冲区对象申请内存
	::EnterCriticalSection(&m_FreeBufferListLock);
	if (m_pFreeBufferList == NULL) //内存池为空,申请新的内存
	{
		//堆内存分配
		pBuffer = (CIOCPBuffer *)::HeapAlloc(GetProcessHeap(), \
					HEAP_ZERO_MEMORY, sizeof(CIOCPBuffer) + BUFFER_SIZE);
	}
	else
	{
		pBuffer = m_pFreeBufferList;
		m_pFreeBufferList = m_pFreeBufferList->pNext;
		pBuffer->pNext = NULL;
		m_nFreeBufferCount--;
	}
	::LeaveCriticalSection(&m_FreeBufferListLock);
	//初始化新的缓冲区对象
	if (pBuffer != NULL)
	{
		pBuffer->buff = (char*)(pBuffer+1);
		pBuffer->nLen = nLen;
	}
	return pBuffer;
}

void CIOCPServer::ReleaseBuffer(CIOCPBuffer *pBuffer)
{
	::EnterCriticalSection(&m_FreeBufferListLock);
	if (m_nFreeBufferCount <= m_nMaxFreeBuffers)
	{
		memset(pBuffer, 0, sizeof(CIOCPBuffer)+BUFFER_SIZE);
		pBuffer->pNext = m_pFreeBufferList;
		m_pFreeBufferList = pBuffer;
		m_nFreeBufferCount++;
	}
	else
	{
		::HeapFree(::GetProcessHeap(), 0, pBuffer);
	}
	::LeaveCriticalSection(&m_FreeBufferListLock);
}

CIOCPContext *CIOCPServer::AllocateContext(SOCKET s)
{
	CIOCPContext *pContext = NULL;
	
	//申请一个CIOCPContext对象
	::EnterCriticalSection(&m_FreeContextListLock);
	if (m_pFreeContextList == NULL)
	{
		pContext = (CIOCPContext*)::HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, \
					sizeof(CIOCPContext));
		::InitializeCriticalSection(&pContext->Lock);
	}
	else
	{
		pContext = m_pFreeContextList;
		m_pFreeContextList = m_pFreeContextList->pNext;
		pContext->pNext = NULL;
		m_nFreeContextCount--;
	}
	::LeaveCriticalSection(&m_FreeContextListLock);

	// 初始化对象成员
	if(pContext != NULL)
	{
		pContext->s = s;
	}
	return pContext;
}

void CIOCPServer::ReleaseContext(CIOCPContext *pContext)
{
	if (pContext->s != INVALID_SOCKET)
	{
		::closesocket(pContext->s); 
	}

	//首先释放(如果有的话)此套接字上的没有按顺序完成的读I/O的缓冲区
	CIOCPBuffer *pNext;
	while (pContext->pOutOfOrderReads != NULL)
	{
		pNext = pContext->pOutOfOrderReads->pNext;
		ReleaseBuffer(pContext->pOutOfOrderReads);
		pContext->pOutOfOrderReads = pNext;
	}
	::EnterCriticalSection(&m_FreeContextListLock);

	if (m_nFreeContextCount <= m_nMaxFreeContexts)
	{
		CRITICAL_SECTION cstmp = pContext->Lock;  //先将关键代码段变量保存到一个临时变量中
		memset(pContext, 0, sizeof(CIOCPContext)); //将要释放的上下文对象初始化为0
		//再放会关键段变量,将要释放的上下文对象添加到空闲列表的表头
		pContext->Lock = cstmp;
		pContext->pNext = m_pFreeContextList;
		m_pFreeContextList = pContext;
		m_nFreeContextCount++;
	}
	else
	{
		::DeleteCriticalSection(&pContext->Lock);
		::HeapFree(::GetProcessHeap(), 0, pContext);
	}
	::LeaveCriticalSection(&m_FreeContextListLock);
}

void CIOCPServer::FreeBuffer()
{
	// 遍历m_pFreeBufferList空闲列表,释放缓冲区池内存
	::EnterCriticalSection(&m_FreeBufferListLock);
	CIOCPBuffer *pFreeBuffer  = m_pFreeBufferList;
	CIOCPBuffer *pNextBuffer;
	while(pFreeBuffer != NULL)
	{
		pNextBuffer = pFreeBuffer->pNext;
		if (!::HeapFree(::GetProcessHeap(), 0 , pFreeBuffer))
		{
#ifdef _DEBUG
			::OutputDebugString(" FreeBuffers释放内存出错!");
#endif
			break;
		}
		pFreeBuffer = pNextBuffer;
	}
	m_pFreeBufferList = NULL;
	m_nFreeBufferCount = 0;

	::LeaveCriticalSection(&m_FreeBufferListLock);
}

void CIOCPServer::FreeContexts()
{
	// 遍历m_pFreeContextList空闲列表,释放缓冲区池内存
	::EnterCriticalSection(&m_FreeContextListLock);
	CIOCPContext *pFreeContext = m_pFreeContextList;
	CIOCPContext *pNextContxt;
	while(pFreeContext != NULL)
	{
		pNextContxt = pFreeContext->pNext;
		::DeleteCriticalSection(&pFreeContext->Lock);
		if (!::HeapFree(::GetProcessHeap(), 0, pFreeContext))
		{
#ifdef _DEBUG
			::OutputDebugString(" FreeContext释放内存出错!");
#endif // _DEBUG
			break;
		}
		pFreeContext = pNextContxt;
	}
	m_pFreeContextList = NULL;
	m_nFreeContextCount = 0;

	::LeaveCriticalSection(&m_FreeContextListLock);
}

BOOL CIOCPServer::AddAConnection(CIOCPContext *pContext)
{
	//向客户连接列表添加一个CIOCPContext对象
	::EnterCriticalSection(&m_ConnectionListLock);
	if (m_nCurrentConnection <= m_nMaxConnections)
	{
		//添加到表头
		pContext->pNext = m_pConnectionList;
		m_pConnectionList = pContext;
		m_nCurrentConnection++;

		printf("++%d\n", m_nCurrentConnection);
		::LeaveCriticalSection(&m_ConnectionListLock);
		return TRUE;
	}

	::LeaveCriticalSection(&m_ConnectionListLock);
	return FALSE;
}

void CIOCPServer::CloseAConnection(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError /* = NO_ERROR */)
{
	//首先从列表中移除要关闭的连接
	::EnterCriticalSection(&m_ConnectionListLock);
	
	CIOCPContext *pTest = m_pConnectionList;
	if (pTest == pContext)
	{
		m_pConnectionList = pContext->pNext;
		m_nCurrentConnection--;
		printf("--%d\n", m_nCurrentConnection);
		if (nError == NO_ERROR)
		{
			OnConnectionClosing(pContext, pBuffer);
		}
		else
		{
			OnConnectionError(pContext, pBuffer, nError);
		}
		
	}
	else
	{
		while(pTest!=NULL && pTest->pNext!=pContext)
		{
			pTest = pTest->pNext;
		}
		if (pTest != NULL)
		{
			pTest->pNext = pContext->pNext;
			m_nCurrentConnection--;
			printf("--%d\n", m_nCurrentConnection);
			if (nError == NO_ERROR)
			{
				OnConnectionClosing(pContext, pBuffer);
			}
			else
			{
				OnConnectionError(pContext, pBuffer, nError);
			}
		}
	}
	
	::LeaveCriticalSection(&m_ConnectionListLock);

	// 然后关闭客户套节字
	::EnterCriticalSection(&pContext->Lock);
	
	if(pContext->s != INVALID_SOCKET)
	{
		::closesocket(pContext->s);	
		pContext->s = INVALID_SOCKET;
	}
	pContext->bClosing = TRUE;
	
	::LeaveCriticalSection(&pContext->Lock);
}

void CIOCPServer::CloseAllConnections()
{
	// 遍历整个连接列表,关闭所有的客户套节字
	::EnterCriticalSection(&m_ConnectionListLock);

	CIOCPContext *pContext = m_pConnectionList;
	while(pContext != NULL)
	{
		::EnterCriticalSection(&pContext->Lock);
		if (pContext->s != INVALID_SOCKET)
		{
			::closesocket(pContext->s);
			pContext->s = INVALID_SOCKET;
		}
		pContext->bClosing = TRUE;

		::LeaveCriticalSection(&pContext->Lock);
		pContext = pContext->pNext;
	}

	m_pConnectionList = NULL;
	m_nCurrentConnection = 0;

	::LeaveCriticalSection(&m_ConnectionListLock);
}

ULONG CIOCPServer::GetCurrentConnection()
{
	return m_nCurrentConnection;
}

BOOL CIOCPServer::InsertPendingAccept(CIOCPBuffer *pBuffer)
{
	// 将一个I/O缓冲区对象插入到m_pPendingAccepts表中
	::EnterCriticalSection(&m_PendingAcceptsLock);
	if (m_pPendingAccepts == NULL)
	{
		m_pPendingAccepts = pBuffer;
	}
	else
	{
		pBuffer->pNext = m_pPendingAccepts;
		m_pPendingAccepts = pBuffer;
	}
	m_nPendingAcceptCount++;

	::LeaveCriticalSection(&m_PendingAcceptsLock);
	return TRUE;
}

BOOL CIOCPServer::RemovePendingAccept(CIOCPBuffer *pBuffer)
{
	BOOL bResult = FALSE;
	
	// 遍历m_pPendingAccepts表,从中移除pBuffer所指向的缓冲区对象
	::EnterCriticalSection(&m_PendingAcceptsLock);
	CIOCPBuffer *pTest = m_pPendingAccepts;
	if (pBuffer == pTest)
	{
		m_pPendingAccepts = m_pPendingAccepts->pNext;
		bResult = TRUE;
	}
	else
	{
		while(pTest!=NULL && pTest->pNext!= pBuffer)
		{
			pTest = pTest->pNext;
		}
		if (pTest!=NULL)
		{
			pTest->pNext = pBuffer->pNext;
			bResult = TRUE;
		}
	}
	// 更新计数
	if(bResult)
	{
		m_nPendingAcceptCount--;
	}

	::LeaveCriticalSection(&m_PendingAcceptsLock);
	return bResult;
}

CIOCPBuffer *CIOCPServer::GetNextReadBuffer(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
	if (pBuffer != NULL)
	{
		//如果与要读的下一个序列号相等,则读这块缓冲区
		if (pBuffer->nSequenceNumber == pContext->nCurrentReadSequence)
		{
			return pBuffer;
		}
		//如果不相等,则说明没按顺序接收数据,将这块缓冲区保存到连接的pOutOfOrderReads列表中
		//列表中的缓冲区是按照其序列号从小到大的顺序排列的
		pBuffer->pNext = NULL;
		CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
		CIOCPBuffer *pPre = NULL;
		while(ptr != NULL) 
		{
			if (pBuffer->nSequenceNumber < ptr->nSequenceNumber)
			{
				break;
			}
			pPre = ptr;
			ptr = ptr->pNext;
		}
		if (pPre == NULL)
		{
			pBuffer->pNext = pContext->pOutOfOrderReads;
			pContext->pOutOfOrderReads = pBuffer;
		}
		else
		{
			pBuffer->pNext = pPre->pNext;
			pPre->pNext = pBuffer->pNext;
		}
	}
	//检查表头元素的序列号,如果要读的序列号一致,就将它从表中移除,返回给用户
	CIOCPBuffer *ptr = pContext->pOutOfOrderReads;
	if (ptr!=NULL && (ptr->nSequenceNumber == pContext->nCurrentReadSequence))
	{
		pContext->pOutOfOrderReads = ptr->pNext;
		return ptr;
	}
	return NULL;
}

BOOL CIOCPServer::PostAccept(CIOCPBuffer *pBuffer)
{
	// 设置I/O类型
	pBuffer->nOperation = OP_ACCEPT;

	//投递此重叠I/O
	DWORD dwBytes;
	pBuffer->sClient = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
	BOOL b = m_lpfnAcceptEx(m_sListen, pBuffer->sClient, pBuffer->buff, \
				pBuffer->nLen-((sizeof(sockaddr_in)+16)*2), sizeof(sockaddr_in)+16, \
				sizeof(sockaddr_in)+16, &dwBytes, &pBuffer->ol);

	if (!b && ::WSAGetLastError() != WSA_IO_PENDING)
	{
		return FALSE;
	}

	return TRUE;
}

BOOL CIOCPServer::PostRecv(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
	//设置I/O类型
	pBuffer->nOperation = OP_READ;
	::EnterCriticalSection(&pContext->Lock);
	pBuffer->nSequenceNumber = pContext->nReadSequence; //设置序列号
	//投递此重叠I/O
	DWORD dwBytes;
	DWORD dwFlags = 0;
	WSABUF buf;
	buf.buf = pBuffer->buff;
	buf.len = pBuffer->nLen;
	if (::WSARecv(pContext->s, &buf, 1, &dwBytes, &dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
	{
		if (::WSAGetLastError() != WSA_IO_PENDING)
		{
			::LeaveCriticalSection(&pContext->Lock);
			return FALSE;
		}
	}

	//增加套接上的重叠I/O计数和读序列号计数
	pContext->nOutstandingRecv++;
	pContext->nReadSequence++;
	::LeaveCriticalSection(&pContext->Lock);
	return TRUE;
}

BOOL CIOCPServer::PostSend(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
	// 跟踪投递的发送的数量,防止用户仅发送数据而不接收,导致服务器抛出大量发送操作
	if (pContext->nOutstandingSend > m_nMaxSends)
	{
		return FALSE;
	}

	//设置I/O类型,增加套接字上的重叠I/O计数
	pBuffer->nOperation = OP_WRITE;

	//投递此重叠I/O
	DWORD dwBytes;
	DWORD dwFlags = 0;
	WSABUF buf;
	buf.buf = pBuffer->buff;
	buf.len = pBuffer->nLen;
	if (::WSASend(pContext->s, &buf, 1, &dwBytes, dwFlags, &pBuffer->ol, NULL) != NO_ERROR)
	{
		if (::WSAGetLastError() != WSA_IO_PENDING)
		{
			return FALSE;
		}
	}

	// 增加套节字上的重叠I/O计数
	::EnterCriticalSection(&pContext->Lock);
	pContext->nOutstandingSend ++;
	::LeaveCriticalSection(&pContext->Lock);
	return TRUE;
}

BOOL CIOCPServer::Start(int nPort, int nMaxConnections, \
						int nMaxFreeBuffer, int nMaxFreeContexts, \
						int nInitialReads)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -