⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iocpmanager.cpp

📁 iocp vc例子,自己是学DELPHI
💻 CPP
📖 第 1 页 / 共 4 页
字号:
{
	int nErr=ERROR_NONE;
	
	CIocpContext* pContext=FindContext(nSocket);
	if (pContext==NULL)
	{
		nErr = ERROR_IOCPMANAGER_FIND_CONTEXT_NULL;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}
	
	return SendData(pContext, pPacket, pAnsPacket, dwWaitTime);
}

int CIocpManager::SendData(CIocpContext* pContext, CIocpPacket* pPacket, CIocpPacket* pAnsPacket, DWORD dwWaitTime)
{
	int nErr=ERROR_NONE;
	if (pContext==NULL || pPacket==NULL)
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}
	
	CIocpBuffer* pBuf=AllocateBuffer(IOTYPE_INIT_WRITE);
	if (pBuf==NULL)
	{
		nErr = ERROR_IOCPMANAGER_ALLOCATE_BUFFER;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	pContext->m_csContext.Lock();	//对连接上下文对象加锁,避免在发送过程中对象被释放
	if (pContext->m_soContext==INVALID_SOCKET)
	{
		pContext->m_csContext.Unlock();
		nErr = ERROR_IOCPMANAGER_INVALID_SOCKET;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	CIocpPacket* pSendPacket=pContext->AllocatePacket(pContext->GenerateKey());
	pPacket->Clone(pSendPacket);
	HANDLE hWaitEvent=CreateEvent(NULL, TRUE, FALSE, NULL);	//创建等待事件
	GUID gdPacket=GUID_NULL;
	//若GUID为空则赋值,不为空则为响应消息,不用赋值
	if (pSendPacket->GetPacketHeader().m_gdHeader==GUID_NULL)
	{
		gdPacket = pContext->GenerateGuid();	//创建发送数据包的唯一标识
		pSendPacket->GetPacketHeader().m_gdHeader = gdPacket;
	}
	else
	{
		gdPacket = pSendPacket->GetPacketHeader().m_gdHeader;
	}

	IOCPWAITSEND iwWaitSend;
	iwWaitSend.gdWait = gdPacket;
	iwWaitSend.hWaitEvent = hWaitEvent;
	iwWaitSend.pContext = pContext;
	iwWaitSend.pWaitPacket = pAnsPacket;
	if (!AddWaitSend(iwWaitSend))	//添加同步命令到数组
	{
		ReleaseWaitEvent(hWaitEvent);
		pContext->ReleasePacket(pSendPacket);
		pContext->m_csContext.Unlock();
		nErr = ERROR_IOCPMANAGER_SYNC_SEND_FAILED;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	pSendPacket->m_bSyncSend = (pAnsPacket!=NULL);	
	pSendPacket->InitSendData();
	pBuf->SetPacket(pSendPacket);
	
	IOCPTASK itTask;
	itTask.wType = IOCPTASK_PACKET_BEGIN;
	itTask.pContext = pContext;
	itTask.pPacket = pSendPacket;
	nErr = Processing(itTask);
	
	nErr = PostWrite(pContext, pBuf);
	pContext->m_csContext.Unlock();
	if (!NOERROROCCUR(nErr)) return nErr;

	if (pAnsPacket!=NULL)	//同步命令等待
	{
		DWORD dwRet=WaitForSingleObject(hWaitEvent, dwWaitTime);
		if (dwRet==WAIT_TIMEOUT)	//等待超时
		{
			IOCPWAITSEND iwWaitSend;
			if (GetWaitSend(gdPacket, iwWaitSend))
			{
				nErr = ERROR_IOCPMANAGER_SEND_WAIT_TIMEOUT;
				if (iwWaitSend.pWaitPacket!=NULL)
					iwWaitSend.pWaitPacket->GetPacketHeader().m_nErrCode = nErr;
				if (iwWaitSend.hWaitEvent!=NULL)
					ReleaseWaitEvent(iwWaitSend.hWaitEvent);				
			}
		}
	}
	return nErr;
}

int CIocpManager::PostWrite(CIocpContext* pContext, CIocpBuffer* pBuf)
{
	int nErr=ERROR_NONE;
	if (pContext==NULL)
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	if (pContext->m_soContext!=INVALID_SOCKET && pBuf!=NULL)
	{
		pBuf->ResetIoType(IOTYPE_INIT_WRITE);
		pContext->EnterIoLoop();

		SetSendSequenceNumber(pContext, pBuf);
		
		int nLastError=0;
		BOOL bRet = PostQueuedCompletionStatus(m_hCompletionPort, 0, 
			(DWORD)pContext, &pBuf->m_olBuffer);
		if (!bRet && (nLastError=GetLastError())!=ERROR_IO_PENDING)
		{
			ReleaseBuffer(pBuf);
			ReleaseContext(pContext);
			nErr = ERROR_IOCPMANAGER_POSTQUEUEDCOMPLETIONSTATUS;
			Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(nLastError), 
				DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);
			return nErr;
		}
	}
	else
	{		
		ReleaseBuffer(pBuf);
		nErr = ERROR_IOCPMANAGER_INVALID_SOCKET;
		Logging2(nErr, DOMAIN_NAME);
	}
	return nErr;
}

int CIocpManager::OnInitWrite(CIocpContext* pContext, CIocpBuffer* pBuf)
{
	int nErr=ERROR_NONE;
	if (pContext==NULL || 
		pContext->m_soContext==INVALID_SOCKET)
	{
		ReleaseBuffer(pBuf);
		ReleaseContext(pContext);
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	pBuf = GetNextSendBuffer(pContext, pBuf);
	while (pBuf!=NULL)
	{
		//判断当前发送数据包是否取消,一个发送数据包对应一个缓冲区
		CIocpPacket* pSendPacket=pBuf->GetPacket();
		if (pSendPacket!=NULL)
		{
			IOCPWAITSEND iwWaitSend;
			if (!GetWaitSend(pSendPacket->GetPacketHeader().m_gdHeader, iwWaitSend, FALSE))
			{
				//通知数据发送已取消
				IOCPTASK itTask;
				itTask.wType = IOCPTASK_PACKET_CANCEL;
				itTask.pContext = pContext;
				itTask.pPacket = pSendPacket;
				nErr = Processing(itTask);
				
				//向接收端发送取消命令
				CIocpPacketHeader pkHeader;
				pkHeader.m_nCmd = CMD_IOCP_COMMON;
				pkHeader.m_wParam = CMD_IOCP_COMMON_SEND_CANCEL;
				pkHeader.m_lParam = pSendPacket->m_ulKey;
				CIocpPacket pkCancel;
				pkCancel.SetPacketHeader(pkHeader);
				nErr = SendData(pContext, &pkCancel);
				
				ReleaseBuffer(pBuf);
				pContext->ReleasePacket(pSendPacket);				
				
				IncreaseSendSequenceNumber(pContext);
				pBuf = GetNextSendBuffer(pContext);
				
				nErr = ERROR_IOCPMANAGER_SEND_USER_CANCEL;		
				Logging2(nErr, DOMAIN_NAME);
				continue;	//继续处理下一个数据包
			}
		}

		pBuf->ResetIoType(IOTYPE_WRITE);
		pBuf->SetupBuffer(IOTYPE_WRITE);
		DWORD dwIoSize=0;
		ULONG ulFlags=MSG_PARTIAL;		
		int nWSARet=0;
		int nRet=WSASend(pContext->m_soContext, pBuf->GetWSABuffer(), 1, 
			&dwIoSize, ulFlags, &pBuf->m_olBuffer, NULL);
		if (nRet==SOCKET_ERROR && (nWSARet=WSAGetLastError())!=WSA_IO_PENDING)
		{
			ReleaseBuffer(pBuf);
			DisconnectContext(pContext);

			IncreaseSendSequenceNumber(pContext);
			pBuf = GetNextSendBuffer(pContext);
			
			ReleaseContext(pContext);
			nErr = ERROR_IOCPMANAGER_WSASEND_FAILED;		
			Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(nWSARet), 
				DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);
		}
		else
		{
			IncreaseSendSequenceNumber(pContext);
			pBuf = GetNextSendBuffer(pContext);
		}
	}

	return nErr;
}

int CIocpManager::OnWrite(CIocpContext* pContext, CIocpBuffer* pBuf, DWORD dwIoSize)
{
	int nErr=ERROR_NONE;
	if (pContext==NULL)
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	if (pBuf!=NULL)
	{
		BOOL bSendOver=pBuf->SendBufferOver(dwIoSize);
		if (bSendOver)
		{
			CIocpPacket* pPacket=pBuf->GetPacket();
			pPacket->ClosePacketFile();

			IOCPTASK itTask;
			itTask.wType = IOCPTASK_PACKET_END;
			itTask.pContext = pContext;
			itTask.pPacket = pPacket;
			nErr = Processing(itTask);

			if (!pPacket->m_bSyncSend)	//若是异步发送数据则发完后将等待数据从数组中删除
			{
				IOCPWAITSEND iwWaitSend;
				if (GetWaitSend(pPacket->GetPacketHeader().m_gdHeader, iwWaitSend))
					ReleaseWaitEvent(iwWaitSend.hWaitEvent);
			}

			ReleaseBuffer(pBuf);
			pContext->ReleasePacket(pPacket);
		}
		else
		{
			int nTranLen=0, nDoneLen=0;
			CIocpPacket* pPacket=pBuf->GetPacket();
			if (pPacket!=NULL && pPacket->NeedInfoProgress(nTranLen, nDoneLen))
			{
				IOCPTASK itTask;
				itTask.wType = IOCPTASK_PACKET_PROGRESS;
				itTask.pContext = pContext;
				itTask.pPacket = pPacket;
				itTask.wParam = nTranLen;
				itTask.lParam = nDoneLen;
				nErr = Processing(itTask);
			}

			nErr = PostWrite(pContext, pBuf);
		}
	}

	ReleaseContext(pContext);
	return nErr;
}

int CIocpManager::Connect(CString strServerAddr, int nServerPort)
{
	int nErr=ERROR_NONE;
	
	SOCKET soSocket=WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
	if (soSocket==INVALID_SOCKET)
	{
		nErr = ERROR_IOCPMANAGER_CREATE_SOCKET;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}
	
	SOCKADDR_IN saServer;
	memset(&saServer, 0, sizeof(saServer));
	saServer.sin_family = AF_INET;
	saServer.sin_addr.s_addr = inet_addr(strServerAddr);
	saServer.sin_port = htons(nServerPort);
	int nRet=connect(soSocket, (sockaddr*)&saServer, sizeof(saServer));
	if (nRet==SOCKET_ERROR && WSAGetLastError()!=WSAEWOULDBLOCK)
	{
		closesocket(soSocket);
		nErr = ERROR_IOCPMANAGER_CONNECT_FAILED;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	return AssociateSocketWithContext(soSocket, NULL, &saServer);
}

int CIocpManager::DisconnectAllContext()
{
	int nErr=ERROR_NONE;
	m_csUsedContext.Lock();
	for (POSITION pos=m_plUsedContext.GetHeadPosition(); pos!=NULL;)
	{
		CIocpContext* pContext=(CIocpContext*)m_plUsedContext.GetNext(pos);
		if (pContext!=NULL) 
		{
			nErr = DisconnectContext(pContext);
		}
	}
	m_csUsedContext.Unlock();
	return nErr;
}

int CIocpManager::StopIocpWorkerThread()
{
	int nErr=ERROR_NONE;

	const DWORD dwWait=100;
	while (m_lIocpWorkerCount>0)
	{
		//通知处理线程停止工作
		PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD)NULL, NULL);
		Sleep(dwWait);
	}
	
	return nErr;
}

void CIocpManager::FreeBuffer()
{
	m_csFreeBuf.Lock();	
	for (POSITION pos=m_plFreeBuffer.GetHeadPosition(); pos!=NULL;)
	{
		CIocpBuffer* pBuf=(CIocpBuffer*)m_plFreeBuffer.GetNext(pos);
		if (pBuf!=NULL) delete pBuf;
	}
	m_plFreeBuffer.RemoveAll();
	m_csFreeBuf.Unlock();

	m_csUsedBuf.Lock();
	for (pos=m_plUsedBuffer.GetHeadPosition(); pos!=NULL;)
	{
		CIocpBuffer* pBuf=(CIocpBuffer*)m_plUsedBuffer.GetNext(pos);
		if (pBuf!=NULL) delete pBuf;
	}
	m_plUsedBuffer.RemoveAll();
	m_csUsedBuf.Unlock();
}

void CIocpManager::FreeContext()
{
	m_csUsedContext.Lock();
	for (POSITION pos=m_plUsedContext.GetHeadPosition(); pos!=NULL;)
	{
		CIocpContext* pContext=(CIocpContext*)m_plUsedContext.GetNext(pos);
		if (pContext!=NULL) 
		{
			if (pContext->m_soContext!=INVALID_SOCKET)
				CloseSocket(pContext->m_soContext);
			ReleaseBufferMap(pContext->m_mapSendBuffer);
			ReleaseBufferMap(pContext->m_mapRecvBuffer);
			pContext->FreePacket();
			delete pContext;
		}
	}
	m_plUsedContext.RemoveAll();
	m_csUsedContext.Unlock();

	
	m_csFreeContext.Lock();
	for (pos=m_plFreeContext.GetHeadPosition(); pos!=NULL;)
	{
		CIocpContext* pContext=(CIocpContext*)m_plFreeContext.GetNext(pos);
		if (pContext!=NULL) delete pContext;
	}
	m_plFreeContext.RemoveAll();
	m_csFreeContext.Unlock();
}

int CIocpManager::AssociateSocketWithContext(SOCKET soSocket, CIocpBuffer* pBuf, SOCKADDR_IN* pRemoveAddr)
{
	int nErr=ERROR_NONE;
	if (soSocket==INVALID_SOCKET)
	{
		ReleaseBuffer(pBuf);
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	//判断连接是否超出上限
	m_csUsedContext.Lock();
	BOOL bReachMaxConnection=(m_plUsedContext.GetCount()>=MAX_NUMBER_OF_CONNECTION);
	m_csUsedContext.Unlock();
	if (bReachMaxConnection)
	{
		CloseSocket(soSocket);		
		ReleaseBuffer(pBuf);
		nErr = ERROR_IOCPMANAGER_REACH_MAX_CONNECTION;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	CIocpContext* pContext=AllocateContext();
	if (pContext==NULL)
	{
		closesocket(soSocket);
		ReleaseBuffer(pBuf);
		nErr = ERROR_IOCPMANAGER_ALLOCATE_CONTEXT;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	if (pRemoveAddr!=NULL)
		memcpy(&(pContext->m_saRemote), pRemoveAddr, sizeof(SOCKADDR_IN));	
	pContext->m_soContext = soSocket;

	/* 
	* TCP_NODELAY	BOOL=TRUE Disables the "nagle algorithm for send coalescing" which delays
	* short packets in the hope that the application will send more data and allow
	* it to combine them into a single one to improve network efficiency.
	*/
	const char chOpt = 1;
	int nRet = setsockopt(pContext->m_soContext, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
	if (nRet==SOCKET_ERROR)
	{
		ReleaseBuffer(pBuf);
		DisconnectContext(pContext);
		ReleaseContext(pContext);		
		nErr = ERROR_IOCPMANAGER_SETSOCKOPT_FAILED;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	nErr = AssociateSocketWithCompletionPort(pContext->m_soContext, 
		m_hCompletionPort, (DWORD)pContext);
	if (!NOERROROCCUR(nErr)) 
	{
		ReleaseBuffer(pBuf);
		DisconnectContext(pContext);
		ReleaseContext(pContext);
		return nErr;
	}
	
	pContext->EnterIoLoop();
	if (pBuf==NULL)
	{
		pBuf = AllocateBuffer(IOTYPE_INITIALIZE);
	}
	pBuf->ResetIoType(IOTYPE_INITIALIZE);
	int nLastError=0;
	BOOL bRet = PostQueuedCompletionStatus(m_hCompletionPort, 0, 
		(DWORD)pContext, &pBuf->m_olBuffer);
	if (!bRet && (nLastError=GetLastError())!=ERROR_IO_PENDING)
	{
		ReleaseBuffer(pBuf);
		DisconnectContext(pContext);
		ReleaseContext(pContext);
		nErr = ERROR_IOCPMANAGER_POSTQUEUEDCOMPLETIONSTATUS;
		Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(nLastError), 
			DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);
		return nErr;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -