⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iocpmanager.cpp

📁 iocp vc例子,自己是学DELPHI
💻 CPP
📖 第 1 页 / 共 4 页
字号:
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	int nNumberofPendingIO=pIocpContext->ExitIoLoop();
	if (nNumberofPendingIO<=0)
	{
		//从使用缓冲区数组中删除
		pIocpContext->m_csContext.Lock();

		m_csUsedContext.Lock();
		POSITION pos=pIocpContext->GetPosition();
		if (pos!=NULL) m_plUsedContext.RemoveAt(pos);
		m_csUsedContext.Unlock();		
		
		pIocpContext->m_bufPackage.ResetBuffer();
		pIocpContext->m_nSendSequenceNumber = pIocpContext->m_nCurrentSendSequenceNumber = 0;
		pIocpContext->m_nRecvSequenceNumber = pIocpContext->m_nCurrentRecvSequenceNumber = 0;
		ReleaseBufferMap(pIocpContext->m_mapSendBuffer);
		ReleaseBufferMap(pIocpContext->m_mapRecvBuffer);
		pIocpContext->FreePacket();	
		pIocpContext->m_pIocpMngr = NULL;
		pIocpContext->m_csContext.Unlock();
		
		//添加到空闲缓冲区数组中
		m_csFreeContext.Lock();
		if (m_plFreeContext.GetCount()<MAX_NUMBER_OF_FREE_CONTEXT_LIST)
		{		
			pos = m_plFreeContext.AddHead(pIocpContext);
			pIocpContext->SetPosition(NULL);
		}
		else
		{
			if (pIocpContext!=NULL) delete pIocpContext;
		}
		m_csFreeContext.Unlock();

		Logging2(ERROR_IOCPMANAGER_DISCONNECT_CONTEXT, DOMAIN_NAME);		
	}
	return nErr;
}

int CIocpManager::ReleaseBuffer(CIocpBuffer* pBuf)
{
	int nErr=ERROR_NONE;
	if (pBuf==NULL)
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	//从使用缓冲区数组中删除
	m_csUsedBuf.Lock();
	POSITION pos=pBuf->GetPosition();
	if (pos!=NULL) m_plUsedBuffer.RemoveAt(pos);
	m_csUsedBuf.Unlock();

	//添加到空闲缓冲区数组中
	m_csFreeBuf.Lock();
	if (m_plUsedBuffer.GetCount()<MAX_NUMBER_OF_FREE_BUFFER_LIST)
	{	
		pos = m_plFreeBuffer.AddHead(pBuf);
		pBuf->SetPosition(NULL);
	}
	else
	{
		if (pBuf!=NULL) delete pBuf;
	}
	m_csFreeBuf.Unlock();

	return nErr;
}

int CIocpManager::ProcessIocpMessage(CIocpContext* pIocpContext, CIocpBuffer* pOverlappedBuf, DWORD dwIoSize)
{
	int nErr = ERROR_NONE;
	if(pOverlappedBuf==NULL)
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}
	
	switch (pOverlappedBuf->GetIoType())
	{
	case IOTYPE_INITIALIZE:
		nErr = OnInitialize(pIocpContext, pOverlappedBuf);
		break;
	case IOTYPE_ACCEPTEX:
		nErr = OnAcceptEx(pOverlappedBuf);
		break;
	case IOTYPE_INIT_READ:
		nErr = OnInitRead(pIocpContext, pOverlappedBuf);
		break;
	case IOTYPE_READ:
		nErr = OnRead(pIocpContext, pOverlappedBuf, dwIoSize);
		break;
	case IOTYPE_INIT_WRITE:
		nErr = OnInitWrite(pIocpContext, pOverlappedBuf);
		break;
	case IOTYPE_WRITE:
		nErr = OnWrite(pIocpContext, pOverlappedBuf, dwIoSize);
		break;
	default:
		nErr = ReleaseBuffer(pOverlappedBuf);
		break;	
	} 
	return nErr;
}

CIocpBuffer* CIocpManager::AllocateBuffer(int nIoType)
{
	CIocpBuffer* pRet=NULL;
	//先从空闲缓冲区数组中获取
	m_csFreeBuf.Lock();
	if (!m_plFreeBuffer.IsEmpty())
	{
		pRet = (CIocpBuffer*)m_plFreeBuffer.RemoveHead();
	}
	m_csFreeBuf.Unlock();

	//新建缓冲区
	if (pRet==NULL)
	{
		pRet = new CIocpBuffer();
	}

	//初始化新建缓冲区,并加入到使用缓冲区数组中
	if (pRet!=NULL)
	{
		pRet->ResetBuffer();
		pRet->ResetIoType(nIoType);
		pRet->m_nSequenceNumber = 0;
		pRet->m_nSendLen = 0;
		m_csUsedBuf.Lock();
		POSITION pos=m_plUsedBuffer.AddHead(pRet);
		pRet->SetPosition(pos);
		m_csUsedBuf.Unlock();
	}

	return pRet;
}

CIocpContext* CIocpManager::AllocateContext()
{
	CIocpContext* pRet=NULL;

	m_csFreeContext.Lock();
	if (!m_plFreeContext.IsEmpty())
	{
		pRet = (CIocpContext*)m_plFreeContext.RemoveHead();
	}
	m_csFreeContext.Unlock();

	if (pRet==NULL)
	{
		pRet = new CIocpContext();
	}

	if (pRet!=NULL)
	{
		m_csUsedContext.Lock();

		POSITION pos=m_plUsedContext.AddHead(pRet);
		pRet->m_csContext.Lock();
		pRet->m_pIocpMngr = this;
		pRet->SetPosition(pos);
		pRet->m_soContext = INVALID_SOCKET;
		pRet->m_nNumberOfPengingIo = 0;
		memset(&(pRet->m_saRemote), 0, sizeof(SOCKADDR_IN));
		pRet->m_bufPackage.ResetBuffer();
		pRet->m_nSendSequenceNumber = pRet->m_nCurrentSendSequenceNumber = 0;
		pRet->m_nRecvSequenceNumber = pRet->m_nCurrentRecvSequenceNumber = 0;
		ReleaseBufferMap(pRet->m_mapSendBuffer);
		ReleaseBufferMap(pRet->m_mapRecvBuffer);
		pRet->FreePacket();
		pRet->m_csContext.Unlock();
		
		m_csUsedContext.Unlock();
	}
	
	return pRet;
}

int CIocpManager::OnAcceptEx(CIocpBuffer* pBuf)
{
	int nErr=ERROR_NONE;

	InterlockedDecrement(&m_nNumberOfAcceptEx);
	
	if (pBuf==NULL || pBuf->m_soSocket==INVALID_SOCKET)
	{
		ReleaseBuffer(pBuf);
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}
	
	// 使SOCKET具有自动监听的功能
	int nRet = setsockopt(pBuf->m_soSocket, SOL_SOCKET, 
		SO_UPDATE_ACCEPT_CONTEXT, (char*)&m_soListen, sizeof(m_soListen));
	if (nRet==SOCKET_ERROR)
	{		
		ReleaseBuffer(pBuf);
		nErr = ERROR_IOCPMANAGER_SETSOCKOPT_FAILED;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}
	
	//获得本地及远程机器IP地址信息
	SOCKADDR_IN  *pLocalAddr=NULL;
	SOCKADDR_IN  *pRemoteAddr=NULL;	
	int nLen=sizeof(SOCKADDR);
	m_lpGetAcceptExSockAddrs(pBuf->GetBuffer(), 0,
		sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, 
		(SOCKADDR**)&pLocalAddr, &nLen, 
		(SOCKADDR**)&pRemoteAddr, &nLen);	
	
	nErr = AssociateSocketWithContext(pBuf->m_soSocket, pBuf, pRemoteAddr);
	if (!NOERROROCCUR(nErr)) return nErr;

	nErr = PostAcceptEx();	//保持AcceptEx请求的数量
	return nErr;
}

int CIocpManager::PostRead(CIocpContext* pContext, CIocpBuffer* pBuf)
{
	int nErr=ERROR_NONE;
	if (pContext==NULL)	
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	if (pContext->m_soContext==INVALID_SOCKET)
	{
		ReleaseBuffer(pBuf);
		ReleaseContext(pContext);
		nErr = ERROR_IOCPMANAGER_INVALID_SOCKET;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}

	if (pBuf==NULL)
	{
		pBuf=AllocateBuffer(IOTYPE_INIT_READ);
	}
	pBuf->ResetIoType(IOTYPE_INIT_READ);
	int nLastError=0;
	BOOL bRet = PostQueuedCompletionStatus(m_hCompletionPort, 0, 
		(DWORD)pContext, &pBuf->m_olBuffer);
	if (!bRet && (nLastError=GetLastError())!=ERROR_IO_PENDING)
	{
		ReleaseBuffer(pBuf);
		DisconnectContext(pContext);
		ReleaseContext(pContext);
		nErr = ERROR_IOCPMANAGER_POSTQUEUEDCOMPLETIONSTATUS;
		Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(nLastError), 
			DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);
	}
	
	return nErr;
}

int CIocpManager::OnInitRead(CIocpContext* pContext, CIocpBuffer* pBuf)
{
	int nErr=ERROR_NONE;

	if (pContext==NULL)
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	if (pBuf==NULL)
	{
		pBuf = AllocateBuffer(IOTYPE_READ);
		if (pBuf==NULL)
		{
			ReleaseContext(pContext);
			nErr = ERROR_IOCPMANAGER_ALLOCATE_BUFFER;
			Logging2(nErr, DOMAIN_NAME);
			return nErr;
		}
	}

	pBuf->ResetIoType(IOTYPE_READ);
	pBuf->SetupBuffer(IOTYPE_READ);
	pContext->m_csContext.Lock();
	pBuf->m_nSequenceNumber = pContext->m_nRecvSequenceNumber;
	
	int nWSARet=0;
	DWORD dwIoSize=0;
	ULONG ulFlags=MSG_PARTIAL;
	UINT nRet=WSARecv(pContext->m_soContext, pBuf->GetWSABuffer(), 1, 
		&dwIoSize, &ulFlags, &pBuf->m_olBuffer, NULL);
	if (nRet==SOCKET_ERROR && (nWSARet=WSAGetLastError())!=WSA_IO_PENDING)
	{
		pContext->m_csContext.Unlock();
		ReleaseBuffer(pBuf);
		DisconnectContext(pContext);
		ReleaseContext(pContext);
		nErr = ERROR_IOCPMANAGER_WSARECV_FAILED;
		Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(nWSARet), 
			DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);
	}
	else
	{
		pContext->m_nRecvSequenceNumber = (pContext->m_nRecvSequenceNumber+1) % MAXSEQUENCENUMBER;
		pContext->m_csContext.Unlock();
	}	

	return nErr;
}

int CIocpManager::OnRead(CIocpContext* pContext, CIocpBuffer* pBuf, DWORD dwIoSize)
{
	int nErr=ERROR_NONE;
	if (pContext==NULL)
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	if (pBuf==NULL || dwIoSize==0)
	{
		ReleaseBuffer(pBuf);
		DisconnectContext(pContext);
		ReleaseContext(pContext);
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}
	
	//按顺序读取数据	
	pBuf->SetBufferLen(dwIoSize);		
	pBuf = GetNextRecvBuffer(pContext, pBuf);
	while (pBuf!=NULL)
	{
		nErr = ProcessBuffer(pContext, pBuf);

		IncreaseRecvSequenceNumber(pContext);
		pBuf = GetNextRecvBuffer(pContext);
	}
	nErr = PostRead(pContext);	
	
	return nErr;
}

int CIocpManager::ProcessBuffer(CIocpContext* pContext, CIocpBuffer* pBuf)
{
	ASSERT(pContext!=NULL && pBuf!=NULL);

	int nErr=ERROR_NONE;
	
	CIocpBuffer* pProcessBuffer=pBuf;
	//若临时缓冲区中有数据
	int nBufferLen=pBuf->GetBufferLen();
	int nPackageLen=pContext->m_bufPackage.GetBufferLen();
	if (nPackageLen>0)
	{
		int nNeedLen=(MAXBUFLEN-nPackageLen);
		if (nBufferLen<nNeedLen)	
		{
			//若总数据长度仍不足MAXBUFLEN,则将数据拷入临时缓冲区并返回
			pContext->m_bufPackage.AddBufferData(pBuf->GetBuffer(), nBufferLen);
			ReleaseBuffer(pBuf);
			return nErr;
		}
		else
		{
			//将数据加入临时缓冲区
			pContext->m_bufPackage.AddBufferData(pBuf->GetBuffer(), nNeedLen);
			pBuf->FlushBuffer(nNeedLen);
			pProcessBuffer = &(pContext->m_bufPackage);
		}
	}

	if (pProcessBuffer!=NULL && pProcessBuffer->GetBufferLen()>=MAXBUFLEN)
	{
		//缓冲区有足够数据,进行处理
		ULONG ulKey=pProcessBuffer->GetKey()+1;	//保证了接收收的标识为双数,与发送包区分开
		CIocpPacket* pPacket=pContext->FindPacket(ulKey);
		if (pPacket==NULL)
		{
			pPacket = pContext->AllocatePacket(ulKey, PACKETTYPE_READ);
			pPacket->SetDefaultPath(m_strDefaultPath);	//设置接收数据包的临时文件默认目录

			IOCPTASK itTask;
			itTask.wType = IOCPTASK_PACKET_BEGIN;
			itTask.pContext = pContext;
			itTask.pPacket = pPacket;
			nErr = Processing(itTask);
		}
		nErr = pProcessBuffer->ProcessBuffer(pPacket);

		if (!pPacket->IsPacketValid())	//无效的数据包
		{
			ReleaseBuffer(pBuf);
			pContext->ReleasePacket(pPacket);
			DisconnectContext(pContext);
			nErr = ERROR_IOCPMANAGER_INVALID_PACKET;
			Logging2(nErr, DOMAIN_NAME);
			return nErr;
		}

		if (NOERROROCCUR(nErr))
		{
			if (pPacket->IsReady())
			{
				pPacket->ClosePacketFile();
				
				IOCPTASK itTask;	
				itTask.wType = IOCPTASK_PACKET_END;
				itTask.pContext = pContext;
				itTask.pPacket = pPacket;
				nErr = Processing(itTask);

				//添加任务到数组中
				pContext->EnterIoLoop();
				IOCPTASK itTaskPacket;
				itTaskPacket.wType = IOCPTASK_PACKET_PROCESS;
				itTaskPacket.pContext = pContext;
				itTaskPacket.pPacket = pPacket;
				if (!AddTask(itTaskPacket))
				{
					pContext->ReleasePacket(pPacket);
					ReleaseContext(pContext);
				}
			}
			else
			{
				int nTranLen=0, nDoneLen=0;
				if (pPacket->NeedInfoProgress(nTranLen, nDoneLen))
				{
					IOCPTASK itTask;
					itTask.wType = IOCPTASK_PACKET_PROGRESS;
					itTask.pContext = pContext;
					itTask.pPacket = pPacket;
					itTask.wParam = nTranLen;
					itTask.lParam = nDoneLen;
					nErr = Processing(itTask);
				}
			}
		}
	}

	if (pBuf->GetBufferLen()>0)	//若仍有多余数据则加入到临时缓冲区中
		pContext->m_bufPackage.AddBufferData(pBuf->GetBuffer(), pBuf->GetBufferLen());

	ReleaseBuffer(pBuf);
	
	return nErr;
}

CIocpContext* CIocpManager::FindContext(int nSocket)
{
	CIocpContext* pRet=NULL;
	m_csUsedContext.Lock();
	for (POSITION pos=m_plUsedContext.GetHeadPosition(); pos!=NULL;)
	{
		CIocpContext* pContext=(CIocpContext*)m_plUsedContext.GetNext(pos);
		if (pContext!=NULL && pContext->m_soContext==(SOCKET)nSocket)
		{
			pRet = pContext;
			break;
		}
	}
	m_csUsedContext.Unlock();
	return pRet;
}

int CIocpManager::SendData(int nSocket, CIocpPacket* pPacket, CIocpPacket* pAnsPacket, DWORD dwWaitTime)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -