⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iocpserver.cpp

📁 使用IOCP编写的可伸缩性的回显服务器
💻 CPP
📖 第 1 页 / 共 2 页
字号:
{
	//检查服务器是否已经启动
	if (m_bServerStarted)
	{
		return FALSE;
	}
	//保存用户参数
	m_nPort = nPort;
	m_nMaxConnections = nMaxConnections;
	m_nMaxFreeBuffers = nMaxFreeBuffer;
	m_nMaxFreeContexts = nMaxFreeContexts;
 	m_nInitialReads = nInitialReads;
	//初始化状态变量
	m_bShutDown = FALSE;
	m_bServerStarted = TRUE;
	//创建监听套接字,绑定到本地端口,进入监听模式
	m_sListen = ::WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
	SOCKADDR_IN sin;
	sin.sin_family = AF_INET;
	sin.sin_port = htons(m_nPort);
	sin.sin_addr.S_un.S_addr = INADDR_ANY;
	if (::bind(m_sListen, (SOCKADDR*)&sin, sizeof(sin)) == SOCKET_ERROR)
	{
		m_bServerStarted = FALSE;
		printf("Failed bind()\n");
		return FALSE;
	}
	::listen(m_sListen, 200);
	
	//创建完成端口对象
	m_hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
	//加载扩展函数AcceptEx
	GUID GuidAcceptEx = WSAID_ACCEPTEX;
	DWORD dwBytes;
	::WSAIoctl(m_sListen, 
		SIO_GET_EXTENSION_FUNCTION_POINTER, 
		&GuidAcceptEx, 
		sizeof(GuidAcceptEx),
		&m_lpfnAcceptEx, 
		sizeof(m_lpfnAcceptEx), 
		&dwBytes, 
		NULL, 
		NULL);

	// 加载扩展函数GetAcceptExSockaddrs
	GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
	::WSAIoctl(m_sListen,
		SIO_GET_EXTENSION_FUNCTION_POINTER,
		&GuidGetAcceptExSockaddrs,
		sizeof(GuidGetAcceptExSockaddrs),
		&m_lpfnGetAcceptExSockaddrs,
		sizeof(m_lpfnGetAcceptExSockaddrs),
		&dwBytes,
		NULL,
		NULL
		);

	// 将监听套节字关联到完成端口,注意,这里为它传递的CompletionKey为0
	// 为0表示系统允许的线程数量和处理器数量一样多
	::CreateIoCompletionPort((HANDLE)m_sListen, m_hCompletion, (DWORD)0, 0);
	// 注册FD_ACCEPT事件。
	// 如果投递的AcceptEx I/O不够,线程会接收到FD_ACCEPT网络事件,说明应该投递更多的AcceptEx I/O
	WSAEventSelect(m_sListen, m_hAcceptEvent, FD_ACCEPT);

	//创建监听线程
	m_hListenThread = ::CreateThread(NULL, 0 , _ListenThreadProc, this, 0, NULL);
	return TRUE;
}

void CIOCPServer::Shutdown()
{
	if (!m_bServerStarted)
	{
		return;
	}

	// 通知监听线程,马上停止服务
	m_bShutDown = TRUE;
	::SetEvent(m_hAcceptEvent);
	// 等待监听线程退出
	::WaitForSingleObject(m_hListenThread, INFINITE);
	::CloseHandle(m_hListenThread);
	m_hListenThread = NULL;
	
	m_bServerStarted = FALSE;
}

DWORD WINAPI CIOCPServer::_ListenThreadProc(LPVOID lpParam)
{
	CIOCPServer *pThis = (CIOCPServer*)lpParam;
	//先在监听套接字上投递几个AcceptI/O
	CIOCPBuffer *pBuffer;
	for (int i=0; i<pThis->m_nInitialAccepts; i++)
	{
		pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);
		if (pBuffer == NULL)
		{
			return -1;
		}
		pThis->InsertPendingAccept(pBuffer);
		pThis->PostAccept(pBuffer);
	}
	//构建事件对象数组,以便在上面调用WSAWaitForMultipleEvents函数
	HANDLE hWaitEvents[2+MAX_THREAD];
	int nEventCount = 0;
	hWaitEvents[nEventCount++] = pThis->m_hAcceptEvent;
	hWaitEvents[nEventCount++] = pThis->m_hRepostEvent;
	//创建指定数量的工作线程在完成端口上处理I/O
	for (i=0; i<MAX_THREAD; i++)
	{
		hWaitEvents[nEventCount++] = ::CreateThread(NULL, 0, _WorkerThreadProc, pThis, 0, NULL);
	}
	//下面进行无限循环,处理事件对象数组上的事件
	while(TRUE)
	{
		int nIndex = ::WSAWaitForMultipleEvents(nEventCount, hWaitEvents, FALSE, 60*1000, FALSE);
		//首先检查是否要停止服务
		if (pThis->m_bShutDown || nIndex==WSA_WAIT_FAILED)
		{
			//关闭所有连接
			pThis->CloseAllConnections();
			::Sleep(0); //给I/O工作线程一个执行的机会
			//关闭监听套接字
			::closesocket(pThis->m_sListen);
			pThis->m_sListen = INVALID_SOCKET;
			::Sleep(0); //给I/O工作线程一个执行的机会

			// 通知所有I/O处理线程退出
			for (int i=2; i<MAX_THREAD+2; i++)
			{
				::PostQueuedCompletionStatus(pThis->m_hCompletion,	-1, 0, NULL);
			}
			//等待I/O处理线程退出
			::WaitForMultipleObjects(MAX_THREAD, &hWaitEvents[2], TRUE, 5*1000);
			for (i=2; i<MAX_THREAD+2; i++)
			{
				::CloseHandle(hWaitEvents[i]);
			}
			::CloseHandle(pThis->m_hCompletion);
			pThis->FreeBuffer();
			pThis->FreeContexts();
			::ExitThread(0);
		}
		//1)定时检查所有未返回的AcceptExI/O的连接建立了多上时间
		if (nIndex == WSA_WAIT_TIMEOUT)
		{
			pBuffer = pThis->m_pPendingAccepts;
			while(pBuffer != NULL)
			{
				int nSeconds;
				int nLen = sizeof(nSeconds);
				//取得连接建立的时间
				::getsockopt(pBuffer->sClient, SOL_SOCKET, SO_CONNECT_TIME, (char *)&nSeconds, \
					&nLen);

				// 如果超过2分钟客户还不发送初始数据,就让这个客户go away
				if(nSeconds != -1 && nSeconds > 2*60)
				{   
					closesocket(pBuffer->sClient);
                    pBuffer->sClient = INVALID_SOCKET;
				}
				
				pBuffer = pBuffer->pNext;
			}
		}
		else
		{
			nIndex = nIndex - WAIT_OBJECT_0;
			WSANETWORKEVENTS ne;
			int nLimit = 0;
			if (nIndex == 0)
			{
				// 2)m_hAcceptEvent事件对象受信,说明投递的Accept请求不够,需要增加
				::WSAEnumNetworkEvents(pThis->m_sListen, hWaitEvents[nIndex], &ne);
				if (ne.lNetworkEvents & FD_ACCEPT)
				{
					nLimit = 50;  //增加的个数,这里设为50个
				}
			}
			else if (nIndex == 1)
			{
				// 3)m_hRepostEvent事件对象受信,说明处理I/O的线程接受到新的客户
				nLimit = InterlockedExchange(&pThis->m_nRepostCount, 0);
			}
			else if (nIndex > 1)
			{
				// I/O服务线程退出,说明有错误发生,关闭服务器
				pThis->m_bShutDown = TRUE;
				continue;
			}
			//投递nLimit个AcceptEx I/O请求
			int i = 0;
			while(i++<nLimit && pThis->m_nPendingAcceptCount<pThis->m_nMAxAccepts)
			{
				pBuffer = pThis->AllocateBuffer(BUFFER_SIZE);
				if (pBuffer != NULL)
				{
					pThis->InsertPendingAccept(pBuffer);
					pThis->PostAccept(pBuffer);
				}
			}
		}
	}
	return 0;
}

DWORD WINAPI CIOCPServer::_WorkerThreadProc(LPVOID lpParam)
{
#ifdef _DEBUG
	::OutputDebugString("WorkerThread 启动... \n");
#endif // _DEBUG
	
	CIOCPServer *pThis = (CIOCPServer*)lpParam;
	CIOCPBuffer *pBuffer;
	DWORD dwKey;
	DWORD dwTrans;
	LPOVERLAPPED lpol;
	while(TRUE)
	{
		//在关联到此完成端口的所有套接字上等待I/O完成
		BOOL bOK = ::GetQueuedCompletionStatus(pThis->m_hCompletion, &dwTrans, \
				(LPDWORD)&dwKey, (LPOVERLAPPED*)&lpol, WSA_INFINITE);
		if(dwTrans == -1) // 用户通知退出
		{
#ifdef _DEBUG
			::OutputDebugString("WorkerThread 退出 \n");
#endif // _DEBUG
			::ExitThread(0);
		}

		pBuffer = CONTAINING_RECORD(lpol, CIOCPBuffer, ol);
		int nError = NO_ERROR;
		if(!bOK)						// 在此套节字上有错误发生
		{
			SOCKET s;
			if(pBuffer->nOperation == OP_ACCEPT)
			{
				s = pThis->m_sListen;
			}
			else
			{
				if(dwKey == 0)
				{
					break;
				}
				s = ((CIOCPContext*)dwKey)->s;
			}
			DWORD dwFlags = 0;
			if(!::WSAGetOverlappedResult(s, &pBuffer->ol, &dwTrans, FALSE, &dwFlags))
			{
				nError = ::WSAGetLastError();
			}
		}
		pThis->HandleIO(dwKey, pBuffer, dwTrans, nError);
	}
	
#ifdef _DEBUG
	::OutputDebugString("WorkerThread 退出 \n");
#endif // _DEBUG
	return 0;
}

void CIOCPServer::HandleIO(DWORD dwKey, CIOCPBuffer *pBuffer, DWORD dwTrans, int nError)
{
	CIOCPContext *pContext = (CIOCPContext *)dwKey;

#ifdef _DEBUG
			::OutputDebugString("	HandleIO... \n");
#endif // _DEBUG
	
	// 1)首先减少套节字上的未决I/O计数
	if(pContext != NULL)
	{
		::EnterCriticalSection(&pContext->Lock);
		
		if(pBuffer->nOperation == OP_READ)
			pContext->nOutstandingRecv --;
		else if(pBuffer->nOperation == OP_WRITE)
			pContext->nOutstandingSend --;
		
		::LeaveCriticalSection(&pContext->Lock);
		
		// 2)检查套节字是否已经被我们关闭
		if(pContext->bClosing) 
		{
#ifdef _DEBUG
			::OutputDebugString("	检查到套节字已经被我们关闭 \n");
#endif // _DEBUG
			if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
			{		
				ReleaseContext(pContext);
			}
			// 释放已关闭套节字的未决I/O
			ReleaseBuffer(pBuffer);	
			return;
		}
	}
	else
	{
		RemovePendingAccept(pBuffer);
	}

	// 3)检查套节字上发生的错误,如果有的话,通知用户,然后关闭套节字
	if(nError != NO_ERROR)
	{
		if(pBuffer->nOperation != OP_ACCEPT)
		{
			CloseAConnection(pContext, pBuffer, nError);
//			OnConnectionError(pContext, pBuffer, nError);
			if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
			{		
				ReleaseContext(pContext);
			}
#ifdef _DEBUG
			::OutputDebugString("	检查到客户套节字上发生错误 \n");
#endif // _DEBUG
		}
		else // 在监听套节字上发生错误,也就是监听套节字处理的客户出错了
		{
			// 客户端出错,释放I/O缓冲区
			if(pBuffer->sClient != INVALID_SOCKET)
			{
				::closesocket(pBuffer->sClient);
				pBuffer->sClient = INVALID_SOCKET;
			}
#ifdef _DEBUG
			::OutputDebugString("	检查到监听套节字上发生错误 \n");
#endif // _DEBUG
		}

		ReleaseBuffer(pBuffer);
		return;
	}


	// 开始处理
	if(pBuffer->nOperation == OP_ACCEPT)
	{
		if(dwTrans == 0)
		{
#ifdef _DEBUG
			::OutputDebugString("	监听套节字上客户端关闭 \n");
#endif // _DEBUG
			
			if(pBuffer->sClient != INVALID_SOCKET)
			{
				::closesocket(pBuffer->sClient);
				pBuffer->sClient = INVALID_SOCKET;
			}
		}
		else
		{
			// 为新接受的连接申请客户上下文对象
			CIOCPContext *pClient = AllocateContext(pBuffer->sClient);
			if(pClient != NULL)
			{
				if(AddAConnection(pClient))
				{	
					// 取得客户地址
					int nLocalLen, nRmoteLen;
					LPSOCKADDR pLocalAddr, pRemoteAddr;
					m_lpfnGetAcceptExSockaddrs(
						pBuffer->buff,
						pBuffer->nLen - ((sizeof(sockaddr_in) + 16) * 2),
						sizeof(sockaddr_in) + 16,
						sizeof(sockaddr_in) + 16,
						(SOCKADDR **)&pLocalAddr,
						&nLocalLen,
						(SOCKADDR **)&pRemoteAddr,
						&nRmoteLen);
					memcpy(&pClient->addrLocal, pLocalAddr, nLocalLen);
					memcpy(&pClient->addrRemote, pRemoteAddr, nRmoteLen);
					
					// 关联新连接到完成端口对象
					::CreateIoCompletionPort((HANDLE)pClient->s, m_hCompletion, (DWORD)pClient, 0);
					
					// 通知用户
					pBuffer->nLen = dwTrans;
					OnConnectionEstablished(pClient, pBuffer);
					
					// 向新连接投递几个Read请求,这些空间在套节字关闭或出错时释放
					for(int i=0; i<5; i++)
					{
						CIOCPBuffer *p = AllocateBuffer(BUFFER_SIZE);
						if(p != NULL)
						{
							if(!PostRecv(pClient, p))
							{
								CloseAConnection(pClient, pBuffer);
								break;
							}
						}
					}
				}
				else	// 连接数量已满,关闭连接
				{
					CloseAConnection(pClient, pBuffer);
					ReleaseContext(pClient);
				}
			}
			else
			{
				// 资源不足,关闭与客户的连接即可
				::closesocket(pBuffer->sClient);
				pBuffer->sClient = INVALID_SOCKET;
			}
		}
		
		// Accept请求完成,释放I/O缓冲区
		ReleaseBuffer(pBuffer);	

		// 通知监听线程继续再投递一个Accept请求
		::InterlockedIncrement(&m_nRepostCount);
		::SetEvent(m_hRepostEvent);
	}
	else if(pBuffer->nOperation == OP_READ)
	{
		if(dwTrans == 0)	// 对方关闭套节字
		{
			// 先关闭连接
			pBuffer->nLen = 0;
			CloseAConnection(pContext, pBuffer);

			// 再通知用户
//			pBuffer->nLen = 0;
//			OnConnectionClosing(pContext, pBuffer);	
			
			// 释放客户上下文和缓冲区对象
			if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
			{		
				ReleaseContext(pContext);
			}
			ReleaseBuffer(pBuffer);	
		}
		else
		{
			pBuffer->nLen = dwTrans;
			// 按照I/O投递的顺序读取接收到的数据
			CIOCPBuffer *p = GetNextReadBuffer(pContext, pBuffer);
			while(p != NULL)
			{
				// 通知用户
				OnReadCompleted(pContext, p);
				// 增加要读的序列号的值
				::InterlockedIncrement((LONG*)&pContext->nCurrentReadSequence);
				// 释放这个已完成的I/O
				ReleaseBuffer(p);
				p = GetNextReadBuffer(pContext, NULL);
			}

			// 继续投递一个新的接收请求
			pBuffer = AllocateBuffer(BUFFER_SIZE);
			if(pBuffer == NULL || !PostRecv(pContext, pBuffer))
			{
				CloseAConnection(pContext, pBuffer);
			}
		}
	}
	else if(pBuffer->nOperation == OP_WRITE)
	{

		if(dwTrans == 0)	// 对方关闭套节字
		{
			// 先关闭连接并通知用户
			pBuffer->nLen = 0;
			CloseAConnection(pContext, pBuffer);

			// 再通知用户
//			pBuffer->nLen = 0;
//			OnConnectionClosing(pContext, pBuffer);	

			// 释放客户上下文和缓冲区对象
			if(pContext->nOutstandingRecv == 0 && pContext->nOutstandingSend == 0)
			{		
				ReleaseContext(pContext);
			}
			ReleaseBuffer(pBuffer);	
		}
		else
		{
			// 写操作完成,通知用户
			pBuffer->nLen = dwTrans;
			OnWriteCompleted(pContext, pBuffer);
			// 释放SendText函数申请的缓冲区
			ReleaseBuffer(pBuffer);
		}
	}
}

BOOL CIOCPServer::SendText(CIOCPContext *pContext, char *pszText, int nLen)
{
	CIOCPBuffer *pBuffer = AllocateBuffer(nLen);
	if(pBuffer != NULL)
	{
		memcpy(pBuffer->buff, pszText, nLen);
		return PostSend(pContext, pBuffer);
	}
	return FALSE;
}

void CIOCPServer::OnConnectionEstablished(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnConnectionClosing(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnReadCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnWriteCompleted(CIOCPContext *pContext, CIOCPBuffer *pBuffer)
{
}

void CIOCPServer::OnConnectionError(CIOCPContext *pContext, CIOCPBuffer *pBuffer, int nError)
{
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -