⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iocpmanager.cpp

📁 iocp vc例子,自己是学DELPHI
💻 CPP
📖 第 1 页 / 共 4 页
字号:
	}

	Logging2(ERROR_IOCPMANAGER_ASSOCIATE_SOCKET_CONTEXT, DOMAIN_NAME);
	return nErr;
}

int CIocpManager::OnInitialize(CIocpContext* pContext, CIocpBuffer* pBuf)
{
	int nErr=ERROR_NONE;

	ReleaseBuffer(pBuf);

	if (pContext==NULL)
	{		
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	pContext->m_csContext.Lock();
	IOCPTASK itTask;
	itTask.wType = IOCPTASK_NEW_CONNECTION;
	itTask.pContext = pContext;
	nErr = Processing(itTask);
	pContext->m_csContext.Unlock();

	int nNumberOfPendingRead=NUMBER_OF_PENDING_READ;
	for (int i=0; i<nNumberOfPendingRead; i++)
	{
		pContext->EnterIoLoop();
		nErr = PostRead(pContext);
	}

	ReleaseContext(pContext);

	return nErr;
}

int CIocpManager::Processing(IOCPTASK& itTask)
{
	int nErr=ERROR_NONE;

	m_csReaderCountProcess.Lock();
	m_nReaderCount ++;
	if (m_nReaderCount==1) m_csWriterProcess.Lock();
	m_csReaderCountProcess.Unlock();

	for (POSITION pos=m_plProcess.GetHeadPosition(); pos!=NULL;)
	{
		CIocpProcess* pIocpProcess=(CIocpProcess*)m_plProcess.GetNext(pos);
		if (pIocpProcess!=NULL)
		{
			nErr = pIocpProcess->Processing(itTask);
			if (NOERROROCCUR(nErr)) break;
		}
	}

	m_csReaderCountProcess.Lock();
	m_nReaderCount --;
	if (m_nReaderCount==0) m_csWriterProcess.Unlock();
	m_csReaderCountProcess.Unlock();

	return nErr;
}

int CIocpManager::AddProcess(CIocpProcess* pProcess)
{
	int nErr=ERROR_NONE;
	if (pProcess==NULL)
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	m_csWriterProcess.Lock();
	pProcess->SetIocpMngr(this);
	m_plProcess.AddHead(pProcess);
	m_csWriterProcess.Unlock();

	return nErr;
}

int CIocpManager::RemoveProcess(CIocpProcess* pProcess)
{
	int nErr=ERROR_NONE;
	if (pProcess==NULL)
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	m_csWriterProcess.Lock();
	for (POSITION pos=m_plProcess.GetHeadPosition(); pos!=NULL;)
	{
		POSITION oldpos=pos;
		CIocpProcess* pIocpProcess=(CIocpProcess*)m_plProcess.GetNext(pos);
		if (pIocpProcess==pProcess)
		{
			m_plProcess.RemoveAt(oldpos);
			break;
		}
	}
	m_csWriterProcess.Unlock();

	return nErr;
}

void CIocpManager::ReleaseBufferMap(CBufferMap& mapBuffer)
{
	int nKey=0;
	CIocpBuffer* pBuf=NULL;
	for (POSITION pos=mapBuffer.GetStartPosition(); pos!=NULL;)
	{
		mapBuffer.GetNextAssoc(pos, nKey, pBuf);
		if (pBuf!=NULL) ReleaseBuffer(pBuf);
	}
	mapBuffer.RemoveAll();
}

CIocpBuffer* CIocpManager::GetNextSendBuffer(CIocpContext* pContext, CIocpBuffer* pBuffer)
{
	CIocpBuffer* pRet=NULL;
	if (pContext==NULL) return pRet;

	pContext->m_csContext.Lock();	
	if (pBuffer!=NULL)
	{
		//判断缓冲区顺序是否正确
		int nSequenceNumber=pBuffer->m_nSequenceNumber;
		if (nSequenceNumber==pContext->m_nCurrentSendSequenceNumber)
		{
			pRet = pBuffer;
			pContext->m_csContext.Unlock();
			return pRet;
		}

		//检查当前顺序号是否重复
		CIocpBuffer* pBufferExist=NULL;
		pContext->m_mapSendBuffer.Lookup(nSequenceNumber, pBufferExist);
		if (pBufferExist!=NULL)
		{
			pContext->m_csContext.Unlock();
			Logging2(ERROR_IOCPMANAGER_MAP_BUFFER_EXIST, DOMAIN_NAME);
			return pRet;
		}

		//保存缓冲区
		pContext->m_mapSendBuffer[nSequenceNumber] = pBuffer;
	}

	//按顺序返回缓冲区
	pContext->m_mapSendBuffer.Lookup(pContext->m_nCurrentSendSequenceNumber, pRet);
	if (pRet!=NULL)
		pContext->m_mapSendBuffer.RemoveKey(pContext->m_nCurrentSendSequenceNumber);
	pContext->m_csContext.Unlock();

	return pRet;
}

void CIocpManager::SetSendSequenceNumber(CIocpContext* pContext, CIocpBuffer* pBuffer)
{
	if (pContext!=NULL && pBuffer!=NULL)
	{
		pContext->m_csContext.Lock();
		pBuffer->m_nSequenceNumber = pContext->m_nSendSequenceNumber;
		pContext->m_nSendSequenceNumber = (pContext->m_nSendSequenceNumber+1) % MAXSEQUENCENUMBER;
		pContext->m_csContext.Unlock();
	}
}

void CIocpManager::IncreaseSendSequenceNumber(CIocpContext* pContext)
{
	if (pContext!=NULL)
	{
		pContext->m_csContext.Lock();
		pContext->m_nCurrentSendSequenceNumber = (pContext->m_nCurrentSendSequenceNumber+1) % MAXSEQUENCENUMBER;
		pContext->m_csContext.Unlock();
	}
}

CIocpBuffer* CIocpManager::GetNextRecvBuffer(CIocpContext* pContext, CIocpBuffer* pBuffer)
{
	CIocpBuffer* pRet=NULL;
	if (pContext==NULL) return pRet;

	pContext->m_csContext.Lock();

	if (pBuffer!=NULL)
	{
		int nSequenceNumber=pBuffer->m_nSequenceNumber;
		if (nSequenceNumber==pContext->m_nCurrentRecvSequenceNumber)
		{
			pRet = pBuffer;
			pContext->m_csContext.Unlock();
			return pRet;
		}

		CIocpBuffer* pBufferExist=NULL;
		pContext->m_mapRecvBuffer.Lookup(nSequenceNumber, pBufferExist);
		if (pBufferExist!=NULL)
		{
			pContext->m_csContext.Unlock();
			Logging2(ERROR_IOCPMANAGER_MAP_BUFFER_EXIST, DOMAIN_NAME);
			return pRet;
		}

		pContext->m_mapRecvBuffer[nSequenceNumber] = pBuffer;
	}

	pContext->m_mapRecvBuffer.Lookup(pContext->m_nCurrentRecvSequenceNumber, pRet);
	if (pRet!=NULL)
		pContext->m_mapRecvBuffer.RemoveKey(pContext->m_nCurrentRecvSequenceNumber);
	pContext->m_csContext.Unlock();

	return pRet;
}

void CIocpManager::IncreaseRecvSequenceNumber(CIocpContext* pContext)
{
	if (pContext!=NULL)
	{
		pContext->m_csContext.Lock();
		pContext->m_nCurrentRecvSequenceNumber = (pContext->m_nCurrentRecvSequenceNumber+1) % MAXSEQUENCENUMBER;
		pContext->m_csContext.Unlock();
	}	
}

BOOL CIocpManager::AddTask(IOCPTASK& itTask)
{
	BOOL bRet=FALSE;
	m_csTask.Lock();
	if (m_listTask.GetCount()<MAX_NUMBER_OF_TASK_LIST)
	{
		m_listTask.AddTail(itTask);
		bRet = TRUE;
	}
	m_csTask.Unlock();
	return bRet;
}

BOOL CIocpManager::GetTask(IOCPTASK& itTask)
{
	BOOL bRet=FALSE;
	m_csTask.Lock();
	if (!m_listTask.IsEmpty())
	{
		itTask = m_listTask.RemoveHead();
		bRet = TRUE;
	}
	m_csTask.Unlock();
	return bRet;
}

void CIocpManager::RemoveTask(CIocpContext* pContext)
{
	m_csTask.Lock();
	for (POSITION pos=m_listTask.GetHeadPosition(); pos!=NULL;)
	{
		POSITION oldpos=pos;
		IOCPTASK itTask=m_listTask.GetNext(pos);
		if (itTask.pContext!=pContext && pContext!=NULL)
			continue;
		
		if (itTask.pContext!=NULL)
		{			
			itTask.pContext->ReleasePacket(itTask.pPacket);
			ReleaseContext(itTask.pContext);
			m_listTask.RemoveAt(oldpos);
		}	
	}
	m_csTask.Unlock();
}

int CIocpManager::CreateTaskProcessThread()
{
	int nErr=ERROR_NONE;

	m_hStopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
	ASSERT(m_hStopEvent!=NULL);

	int nNumberOfTaskProcessThread=NUMBER_OF_TASK_PROCESS_THREAD;
	DWORD dwThreadId=0;
	for(int i=0; i<nNumberOfTaskProcessThread;i++)
	{
		HANDLE hThread=CreateThread(NULL, 0, CIocpManager::TaskProcessThread, 
			(LPVOID)this, THREAD_PRIORITY_NORMAL, &dwThreadId);
		if (hThread==NULL)
		{
			nErr = ERROR_IOCPMANAGER_TASK_PROCESS_THREAD_CREATE;
			Logging2(nErr, DOMAIN_NAME);
			break;
		}
		CloseHandle(hThread);
	}
	return nErr;
}

DWORD CIocpManager::TaskProcessThread(LPVOID lpParam)
{
	int nErr=ERROR_NONE;
	CIocpManager* pThis=reinterpret_cast<CIocpManager*>(lpParam);
	if (pThis==NULL)
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}
	
	InterlockedIncrement(&pThis->m_lTaskProcessCount);
	
	IOCPTASK itTask;
	while (1)
	{
		if (WaitForSingleObject(pThis->m_hStopEvent, 100) == WAIT_OBJECT_0)
            break;

		if (pThis->GetTask(itTask))
		{
			CIocpContext* pContext=itTask.pContext;

			pThis->Processing(itTask);

			if (pContext!=NULL)
			{
				pContext->ReleasePacket(itTask.pPacket);
				pThis->ReleaseContext(pContext);
			}
		}
	}
	
	InterlockedDecrement(&pThis->m_lTaskProcessCount);
	
	Logging2(ERROR_IOCPMANAGER_TASK_PROCESS_THREAD_STOP, DOMAIN_NAME);
	
	return nErr;
}

int CIocpManager::StopTaskProcessThread()
{
	int nErr=ERROR_NONE;
	
	SetEvent(m_hStopEvent);
	while (m_lTaskProcessCount>0)
	{
		Sleep(100);
	}
	CloseHandle(m_hStopEvent);
	m_hStopEvent = NULL;

	//删除剩余任务
	RemoveTask();

	return nErr;
}

BOOL CIocpManager::AddWaitSend(IOCPWAITSEND& iwWaitSend)
{
	BOOL bRet=FALSE;
	m_csWaitSend.Lock();
	if (m_aWaitSend.GetSize()<MAX_NUMBER_OF_WAIT_SEND_LIST)
	{				
		m_aWaitSend.Add(iwWaitSend);
		bRet = TRUE;
	}
	m_csWaitSend.Unlock();
	return bRet;
}

void CIocpManager::ReleaseWaitEvent(HANDLE& hWaitEvent)
{
	if (hWaitEvent!=NULL)
	{
		SetEvent(hWaitEvent);
		CloseHandle(hWaitEvent);
		hWaitEvent = NULL;
	}
}

BOOL CIocpManager::GetWaitSend(GUID& gdPacket, IOCPWAITSEND& iwWaitSend, BOOL bRemove)
{
	BOOL bRet=FALSE;
	m_csWaitSend.Lock();
	for (int i=0; i<m_aWaitSend.GetSize(); i++)
	{
		IOCPWAITSEND iws=m_aWaitSend[i];
		if (iws.gdWait==gdPacket)
		{
			iwWaitSend = iws;
			bRet = TRUE;
			if (bRemove) m_aWaitSend.RemoveAt(i);
			break;
		}
	}
	m_csWaitSend.Unlock();
	return bRet;
}

void CIocpManager::RemoveWaitSend(CIocpContext* pContext)
{
	m_csWaitSend.Lock();
	for (int i=0; i<m_aWaitSend.GetSize(); i++)
	{
		IOCPWAITSEND iws=m_aWaitSend[i];
		if (iws.pContext!=pContext && pContext!=NULL) 
			continue;

		if (iws.pWaitPacket!=NULL)
			iws.pWaitPacket->GetPacketHeader().m_nErrCode = ERROR_IOCPMANAGER_DISCONNECT_CONTEXT;
		if (iws.hWaitEvent!=NULL)
			ReleaseWaitEvent(iws.hWaitEvent);
	}
	m_aWaitSend.RemoveAll();
	m_csWaitSend.Unlock();
}

int CIocpManager::CancelSendData(int nSocket, ULONG ulKey)
{
	int nErr=ERROR_NONE;
	
	CIocpContext* pContext=FindContext(nSocket);
	if (pContext==NULL)
	{
		nErr = ERROR_IOCPMANAGER_FIND_CONTEXT_NULL;
		Logging2(nErr, DOMAIN_NAME);
		return nErr;
	}
	
	return CancelSendData(pContext, ulKey);
}

int CIocpManager::CancelSendData(CIocpContext* pContext, ULONG ulKey)
{
	int nErr=ERROR_NONE;
	if (pContext==NULL)
	{
		nErr = ERROR_FUNC_PARAM_INVALID;
		return nErr;
	}

	pContext->m_csContext.Lock();
	CIocpPacket* pPacket=pContext->FindPacket(ulKey, PACKETTYPE_WRITE);	//查找发送数据包
	if (pPacket!=NULL)
	{
		IOCPWAITSEND iwWaitSend;
		if (GetWaitSend(pPacket->GetPacketHeader().m_gdHeader, iwWaitSend))
		{
			if (iwWaitSend.pWaitPacket!=NULL)	//设置出错信息
				iwWaitSend.pWaitPacket->GetPacketHeader().m_nErrCode = ERROR_IOCPMANAGER_SEND_USER_CANCEL;
			if (iwWaitSend.hWaitEvent!=NULL) 	//清除同步命令的等待事件
				ReleaseWaitEvent(iwWaitSend.hWaitEvent);
		}
	}
	else
	{
		CIocpPacket* pPacket=pContext->FindPacket(ulKey, PACKETTYPE_READ);	//查找接收数据包
		if (pPacket!=NULL)
		{
			//接收端取消则向发送端发送取消命令
			CIocpPacketHeader pkHeader;
			pkHeader.m_nCmd = CMD_IOCP_COMMON;
			pkHeader.m_wParam = CMD_IOCP_COMMON_RECV_CANCEL;
			pkHeader.m_lParam = pPacket->m_ulKey;
			CIocpPacket pkCancel;
			pkCancel.SetPacketHeader(pkHeader);
			nErr = SendData(pContext, &pkCancel);
		}
	}
	pContext->m_csContext.Unlock();

	return nErr;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -