⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iocpmanager.cpp

📁 iocp vc例子,自己是学DELPHI
💻 CPP
📖 第 1 页 / 共 4 页
字号:
// IocpManager.cpp: implementation of the CIocpManager class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "IocpManager.h"

#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

CIocpManager::CIocpManager()
{
	m_hCompletionPort = NULL;
	m_soListen = INVALID_SOCKET;
	m_nNumberOfAcceptEx = 0;
	m_lIocpWorkerCount = 0;
	m_lpAcceptEx = NULL;
	m_lpGetAcceptExSockAddrs = NULL;
	m_bIocpStarted = FALSE;
	m_nListenPortNumber = 0;
	m_lTaskProcessCount = 0;
	m_hStopEvent = NULL;
	m_nReaderCount = 0;
	
	InitConfig();
	
	m_itemIocp.GetAttr(XML_IOCPMANAGER_DEFAULT_PATH, m_strDefaultPath);
	if (m_strDefaultPath.IsEmpty())
		m_strDefaultPath = CPublicFunc::GetFullTempPath();
	
	WSADATA wsaData;
	m_nWSAStartupRet = WSAStartup(MAKEWORD(2,2), &wsaData);
}

CIocpManager::~CIocpManager()
{
	Stop();
	
	if (NOERROROCCUR(m_nWSAStartupRet))
		WSACleanup();	
}


int CIocpManager::InitConfig()
{
	CString strErrorXml=_T("ZNetConfig.xml");
	CString strXmlPath=CPublicFunc::GetFullPathOfFile(strErrorXml);
	CXmlManager XmlMngr;
	int nErr=XmlMngr.Load(strXmlPath);
	if (NOERROROCCUR(nErr))
		nErr=XmlMngr.GetItem(_T("IocpConfig"), m_itemIocp);
	return nErr;
}

void CIocpManager::CloseSocket(SOCKET& soSocket)
{
	if (soSocket==INVALID_SOCKET) return;
	//如果要关闭连接,将套接字上的linger值设为0
	LINGER lingerStruct;
	lingerStruct.l_onoff = 1;
	lingerStruct.l_linger = 0;
	setsockopt(soSocket, SOL_SOCKET, SO_LINGER,
		(char *)&lingerStruct, sizeof(lingerStruct));

	CancelIo((HANDLE)soSocket);		
	closesocket(soSocket);
	soSocket = INVALID_SOCKET;
}

int CIocpManager::Start(int nPortNumber)
{
	int nErr=ERROR_NONE;

	if (m_bIocpStarted)
	{
		nErr = ERROR_IOCPMANAGER_IOCP_STARTED;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	if (!NOERROROCCUR(m_nWSAStartupRet))
	{
		nErr = ERROR_IOCPMANAGER_WSASTARTUP;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	nErr = CreateCompletionPort();
	if (!NOERROROCCUR(nErr)) return nErr;		
	
	nErr = CreateIocpWorkerThread();
	if (!NOERROROCCUR(nErr)) return nErr;

	nErr = CreateTaskProcessThread();
	if (!NOERROROCCUR(nErr)) return nErr;

	nErr = CreateListenPort(nPortNumber);
	if (!NOERROROCCUR(nErr)) return nErr;

	m_bIocpStarted = TRUE;
	Logging2(ERROR_IOCPMANAGER_START_SUCCESS, DOMAIN_NAME);	

	return nErr;
}

int CIocpManager::Stop()
{
	int nErr=ERROR_NONE;

	if (!m_bIocpStarted)
	{
		nErr = ERROR_IOCPMANAGER_IOCP_NOT_STARTED;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	nErr = DisconnectAllContext();
	
	nErr = StopTaskProcessThread();

	nErr = StopIocpWorkerThread();
	
	if (m_hCompletionPort!=NULL)
	{
		CloseHandle(m_hCompletionPort);
		m_hCompletionPort = NULL;
	}

	if (m_soListen!=INVALID_SOCKET)
	{
		closesocket(m_soListen);
		m_soListen = INVALID_SOCKET;
	}
	
	FreeContext();
	FreeBuffer();

	m_bIocpStarted = FALSE;
	Logging2(ERROR_IOCPMANAGER_STOP_SUCCESS, DOMAIN_NAME);

	return nErr;
}

int CIocpManager::CreateCompletionPort()
{
	int nErr=ERROR_NONE;
	
	int nNumberOfConcurrentPerCPU=NUMBER_OF_CONCURRENT_PER_CPU;
	//获得CPU个数
	SYSTEM_INFO sysInfo;
	WSADATA wsaData;	
	ZeroMemory(&sysInfo,sizeof(SYSTEM_INFO));
	ZeroMemory(&wsaData,sizeof(WSADATA));	
	GetSystemInfo(&sysInfo);	
	int nNumberOfConcurrent = sysInfo.dwNumberOfProcessors * nNumberOfConcurrentPerCPU;
	
	m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, nNumberOfConcurrent);
	if (m_hCompletionPort==NULL)
	{
		nErr = ERROR_IOCPMANAGER_CREATE_IO_COMPLETION_PORT;
		Logging2(nErr, DOMAIN_NAME);
	}
	
	return nErr;
}

int CIocpManager::CreateListenPort(int nListenPort)
{
	int nErr=ERROR_NONE;
	m_nListenPortNumber = nListenPort;
	if (m_nListenPortNumber<=0) return nErr;	//若是客户端不需要创建监听端口

	m_soListen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);
	if (m_soListen==INVALID_SOCKET)
	{
		nErr = ERROR_IOCPMANAGER_CREATE_SOCKET;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	nErr = AssociateSocketWithCompletionPort(m_soListen, m_hCompletionPort, NULL);
	if (!NOERROROCCUR(nErr)) return nErr;
	
	SOCKADDR_IN	saServer;
	saServer.sin_port = htons(m_nListenPortNumber);
	saServer.sin_family = AF_INET;
	saServer.sin_addr.s_addr = INADDR_ANY;
	
	int nRet = bind(m_soListen, (LPSOCKADDR)&saServer, sizeof(struct sockaddr));	
	if ( nRet == SOCKET_ERROR )
	{
		nErr = ERROR_IOCPMANAGER_BIND_LISTEN_SOCKET;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}
	
	int nMaxListenPengingConnection=MAX_LISTEN_PENDING_CONNECTION;	
	nRet=listen(m_soListen, nMaxListenPengingConnection);
	if ( nRet == SOCKET_ERROR )
	{
		nErr = ERROR_IOCPMANAGER_LISTEN_LISTEN_SOCKET;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}	

	//获得AcceptEx函数指针
	DWORD dwRet=0;
	static GUID guidAcceptEx=WSAID_ACCEPTEX;
	nRet=WSAIoctl(m_soListen, SIO_GET_EXTENSION_FUNCTION_POINTER, 
		&guidAcceptEx, sizeof(guidAcceptEx), 
		&m_lpAcceptEx, sizeof(m_lpAcceptEx), 
		&dwRet, NULL, NULL);
	if (nRet==SOCKET_ERROR)
	{
		nErr = ERROR_IOCPMANAGER_WSAIOCTL_ACCEPTEX;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	static GUID guidGetAcceptExSockAddrs=WSAID_GETACCEPTEXSOCKADDRS;
	nRet=WSAIoctl(m_soListen, SIO_GET_EXTENSION_FUNCTION_POINTER, 
		&guidGetAcceptExSockAddrs, sizeof(guidGetAcceptExSockAddrs), 
		&m_lpGetAcceptExSockAddrs, sizeof(m_lpGetAcceptExSockAddrs), 
		&dwRet, NULL, NULL);
	if (nRet==SOCKET_ERROR)
	{
		nErr = ERROR_IOCPMANAGER_WSAIOCTL_GETACCEPTEXSOCKADDRS;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	return CreateAcceptEx();
}

int CIocpManager::CreateAcceptEx()
{
	int nErr=ERROR_NONE;
	int nNumberOfAcceptEx=NUMBER_OF_ACCEPTEX;
	for (int i=0; i<nNumberOfAcceptEx; i++)
	{
		nErr = PostAcceptEx();		
	}

	return nErr;
}

int CIocpManager::PostAcceptEx()
{
	int nErr=ERROR_NONE;
	
	SOCKET soSocket=WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
	if (soSocket==INVALID_SOCKET)
	{
		nErr = ERROR_IOCPMANAGER_CREATE_SOCKET;
		return nErr;
	}

	CIocpBuffer* pBuf=AllocateBuffer(IOTYPE_ACCEPTEX);
	if (pBuf==NULL)
	{
		closesocket(soSocket);
		nErr = ERROR_IOCPMANAGER_ALLOCATE_BUFFER;	
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}	

	pBuf->m_soSocket = soSocket;
	int nWSARet=0;
	DWORD dwBytes=0;
	BOOL bRet=m_lpAcceptEx(m_soListen, soSocket, pBuf->GetBuffer(), 0, 
		sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, &dwBytes, &pBuf->m_olBuffer);
	if (!bRet && (nWSARet=WSAGetLastError())!=WSA_IO_PENDING)
	{
		closesocket(soSocket);
		ReleaseBuffer(pBuf);
		nErr = ERROR_IOCPMANAGER_ACCEPTEX_FAILED;
		Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(nWSARet), 
			DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);
	}
	else
	{	
		InterlockedIncrement(&m_nNumberOfAcceptEx);
	}

	return nErr;
}

int CIocpManager::AssociateSocketWithCompletionPort(SOCKET socket, HANDLE hCompletionPort, DWORD dwCompletionKey)
{
	int nErr=ERROR_NONE;
	HANDLE hRet=CreateIoCompletionPort((HANDLE) socket, hCompletionPort, dwCompletionKey, 0);
	if (hRet!=hCompletionPort)
	{
		nErr = ERROR_IOCPMANAGER_ASSOCIATE_COMPLETION_PORT;
		Logging2(nErr, DOMAIN_NAME);
	}
	return nErr;
}

int CIocpManager::CreateIocpWorkerThread()
{
	int nErr=ERROR_NONE;
	int nNumberOfIocpWorkerThread=NUMBER_OF_IOCP_WORKER_THREAD;
	DWORD dwThreadId=0;
	for(int i=0; i<nNumberOfIocpWorkerThread;i++)
	{
		HANDLE hThread=CreateThread(NULL, 0, CIocpManager::IocpWorkerThread, 
			(LPVOID)this, THREAD_PRIORITY_NORMAL, &dwThreadId);
		if (hThread==NULL)
		{
			nErr = ERROR_IOCPMANAGER_IOCP_WORKER_THREAD_CREATE;
			Logging2(nErr, DOMAIN_NAME);
			break;
		}
		CloseHandle(hThread);
	}
	return nErr;
}

DWORD CIocpManager::IocpWorkerThread(LPVOID lpParam)
{
	int nErr=ERROR_NONE;
	CIocpManager* pThis=reinterpret_cast<CIocpManager*>(lpParam);
	if (pThis==NULL)
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	InterlockedIncrement(&pThis->m_lIocpWorkerCount);

	DWORD dwIoSize=0;
	CIocpContext* lpIocpContext=NULL;
	CIocpBuffer* pOverlappedBuf=NULL;
	HANDLE hCompletionPort=pThis->m_hCompletionPort;
	LPOVERLAPPED lpOverlapped=NULL;
	BOOL bError=FALSE;

	while (!bError)
	{
		lpIocpContext = NULL;
		pOverlappedBuf = NULL;
		//获取一个请求
		BOOL bIoRet=GetQueuedCompletionStatus(hCompletionPort, 
			&dwIoSize, (LPDWORD)&lpIocpContext, &lpOverlapped, INFINITE);

		//获取请求失败
		if (!bIoRet)
		{
			DWORD dwIoError=GetLastError();
			if (dwIoError!=WAIT_TIMEOUT)
			{
				nErr = ERROR_IOCPMANAGER_GETQUEUEDCOMPLETIONSTATUS;
				Logging2(nErr, DOMAIN_NAME);
				Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(dwIoError), 
					DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);

				if (lpIocpContext!=NULL)
				{
					//当连接套接字被取消并且有WSASend/WSARead请求未完成,
					//则I/O请求取消并且返回ERROR_NETNAME_DELETED
					if(dwIoError==ERROR_NETNAME_DELETED) 
					{
						nErr = ERROR_IOCPMANAGER_NET_NAME_DELETED;
						pThis->DisconnectContext(lpIocpContext); 
						pThis->ReleaseContext(lpIocpContext);
					}
					else
					{
						pThis->DisconnectContext(lpIocpContext); 
						pThis->ReleaseContext(lpIocpContext);
					}
					// Clear the buffer if returned. 
					pOverlappedBuf=NULL;
					if(lpOverlapped!=NULL)
						pOverlappedBuf=CONTAINING_RECORD(lpOverlapped, CIocpBuffer, m_olBuffer);
					if(pOverlappedBuf!=NULL)
						pThis->ReleaseBuffer(pOverlappedBuf);			
					continue;
				}
				// GetQueuedCompletionStatus发生错误退出线程
				nErr = ERROR_IOCPMANAGER_IN_GETQUEUEDCOMPLETIONSTATUS;
				bError = TRUE;

				pOverlappedBuf=NULL;
				if(lpOverlapped!=NULL)
					pOverlappedBuf=CONTAINING_RECORD(lpOverlapped, CIocpBuffer, m_olBuffer);
				if(pOverlappedBuf!=NULL)
					pThis->ReleaseBuffer(pOverlappedBuf);
				continue;
			}			
		}
		
		if(bIoRet && lpOverlapped!=NULL)  
		{
			pOverlappedBuf=CONTAINING_RECORD(lpOverlapped, CIocpBuffer, m_olBuffer);
			if(pOverlappedBuf!=NULL)
				nErr = pThis->ProcessIocpMessage(lpIocpContext, pOverlappedBuf, dwIoSize);
		}	
		
		if(lpIocpContext==NULL && pOverlappedBuf==NULL)
		{
			nErr = ERROR_IOCPMANAGER_CONTEXT_BUFFER_NULL;
			Logging2(nErr, DOMAIN_NAME);
			bError = TRUE;
		}
	}

	InterlockedDecrement(&pThis->m_lIocpWorkerCount);
	
	Logging2(ERROR_IOCPMANAGER_IOCP_WORKER_THREAD_STOP, DOMAIN_NAME);
	
	return nErr;
}

int CIocpManager::DisconnectContext(CIocpContext* pIocpContext)
{
	int nErr=ERROR_NONE;
	if (pIocpContext==NULL)
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	//如果套接字有效则关闭
	pIocpContext->m_csContext.Lock();
	if (pIocpContext->m_soContext!=INVALID_SOCKET)
	{	
		IOCPTASK itTask;	
		itTask.wType = IOCPTASK_DIS_CONNECTION;
		itTask.pContext = pIocpContext;
		nErr = Processing(itTask);

		RemoveWaitSend(pIocpContext);
		RemoveTask(pIocpContext);
		CloseSocket(pIocpContext->m_soContext);
	}
	pIocpContext->m_csContext.Unlock();

	return nErr;
}

int CIocpManager::DisconnectContext(int nSocket)
{
	int nErr=ERROR_NONE;
	
	CIocpContext* pContext=FindContext(nSocket);
	if (pContext==NULL)
	{
		nErr = ERROR_IOCPMANAGER_FIND_CONTEXT_NULL;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}
	
	return DisconnectContext(pContext);
}

int CIocpManager::ReleaseContext(CIocpContext* pIocpContext)
{
	int nErr=ERROR_NONE;

	if (pIocpContext==NULL)
	{

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -