📄 iocpmanager.cpp
字号:
}
Logging2(ERROR_IOCPMANAGER_ASSOCIATE_SOCKET_CONTEXT, DOMAIN_NAME);
return nErr;
}
int CIocpManager::OnInitialize(CIocpContext* pContext, CIocpBuffer* pBuf)
{
int nErr=ERROR_NONE;
ReleaseBuffer(pBuf);
if (pContext==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
pContext->m_csContext.Lock();
IOCPTASK itTask;
itTask.wType = IOCPTASK_NEW_CONNECTION;
itTask.pContext = pContext;
nErr = Processing(itTask);
pContext->m_csContext.Unlock();
int nNumberOfPendingRead=NUMBER_OF_PENDING_READ;
for (int i=0; i<nNumberOfPendingRead; i++)
{
pContext->EnterIoLoop();
nErr = PostRead(pContext);
}
ReleaseContext(pContext);
return nErr;
}
int CIocpManager::Processing(IOCPTASK& itTask)
{
int nErr=ERROR_NONE;
m_csReaderCountProcess.Lock();
m_nReaderCount ++;
if (m_nReaderCount==1) m_csWriterProcess.Lock();
m_csReaderCountProcess.Unlock();
for (POSITION pos=m_plProcess.GetHeadPosition(); pos!=NULL;)
{
CIocpProcess* pIocpProcess=(CIocpProcess*)m_plProcess.GetNext(pos);
if (pIocpProcess!=NULL)
{
nErr = pIocpProcess->Processing(itTask);
if (NOERROROCCUR(nErr)) break;
}
}
m_csReaderCountProcess.Lock();
m_nReaderCount --;
if (m_nReaderCount==0) m_csWriterProcess.Unlock();
m_csReaderCountProcess.Unlock();
return nErr;
}
int CIocpManager::AddProcess(CIocpProcess* pProcess)
{
int nErr=ERROR_NONE;
if (pProcess==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
m_csWriterProcess.Lock();
pProcess->SetIocpMngr(this);
m_plProcess.AddHead(pProcess);
m_csWriterProcess.Unlock();
return nErr;
}
int CIocpManager::RemoveProcess(CIocpProcess* pProcess)
{
int nErr=ERROR_NONE;
if (pProcess==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
m_csWriterProcess.Lock();
for (POSITION pos=m_plProcess.GetHeadPosition(); pos!=NULL;)
{
POSITION oldpos=pos;
CIocpProcess* pIocpProcess=(CIocpProcess*)m_plProcess.GetNext(pos);
if (pIocpProcess==pProcess)
{
m_plProcess.RemoveAt(oldpos);
break;
}
}
m_csWriterProcess.Unlock();
return nErr;
}
void CIocpManager::ReleaseBufferMap(CBufferMap& mapBuffer)
{
int nKey=0;
CIocpBuffer* pBuf=NULL;
for (POSITION pos=mapBuffer.GetStartPosition(); pos!=NULL;)
{
mapBuffer.GetNextAssoc(pos, nKey, pBuf);
if (pBuf!=NULL) ReleaseBuffer(pBuf);
}
mapBuffer.RemoveAll();
}
CIocpBuffer* CIocpManager::GetNextSendBuffer(CIocpContext* pContext, CIocpBuffer* pBuffer)
{
CIocpBuffer* pRet=NULL;
if (pContext==NULL) return pRet;
pContext->m_csContext.Lock();
if (pBuffer!=NULL)
{
//判断缓冲区顺序是否正确
int nSequenceNumber=pBuffer->m_nSequenceNumber;
if (nSequenceNumber==pContext->m_nCurrentSendSequenceNumber)
{
pRet = pBuffer;
pContext->m_csContext.Unlock();
return pRet;
}
//检查当前顺序号是否重复
CIocpBuffer* pBufferExist=NULL;
pContext->m_mapSendBuffer.Lookup(nSequenceNumber, pBufferExist);
if (pBufferExist!=NULL)
{
pContext->m_csContext.Unlock();
Logging2(ERROR_IOCPMANAGER_MAP_BUFFER_EXIST, DOMAIN_NAME);
return pRet;
}
//保存缓冲区
pContext->m_mapSendBuffer[nSequenceNumber] = pBuffer;
}
//按顺序返回缓冲区
pContext->m_mapSendBuffer.Lookup(pContext->m_nCurrentSendSequenceNumber, pRet);
if (pRet!=NULL)
pContext->m_mapSendBuffer.RemoveKey(pContext->m_nCurrentSendSequenceNumber);
pContext->m_csContext.Unlock();
return pRet;
}
void CIocpManager::SetSendSequenceNumber(CIocpContext* pContext, CIocpBuffer* pBuffer)
{
if (pContext!=NULL && pBuffer!=NULL)
{
pContext->m_csContext.Lock();
pBuffer->m_nSequenceNumber = pContext->m_nSendSequenceNumber;
pContext->m_nSendSequenceNumber = (pContext->m_nSendSequenceNumber+1) % MAXSEQUENCENUMBER;
pContext->m_csContext.Unlock();
}
}
void CIocpManager::IncreaseSendSequenceNumber(CIocpContext* pContext)
{
if (pContext!=NULL)
{
pContext->m_csContext.Lock();
pContext->m_nCurrentSendSequenceNumber = (pContext->m_nCurrentSendSequenceNumber+1) % MAXSEQUENCENUMBER;
pContext->m_csContext.Unlock();
}
}
CIocpBuffer* CIocpManager::GetNextRecvBuffer(CIocpContext* pContext, CIocpBuffer* pBuffer)
{
CIocpBuffer* pRet=NULL;
if (pContext==NULL) return pRet;
pContext->m_csContext.Lock();
if (pBuffer!=NULL)
{
int nSequenceNumber=pBuffer->m_nSequenceNumber;
if (nSequenceNumber==pContext->m_nCurrentRecvSequenceNumber)
{
pRet = pBuffer;
pContext->m_csContext.Unlock();
return pRet;
}
CIocpBuffer* pBufferExist=NULL;
pContext->m_mapRecvBuffer.Lookup(nSequenceNumber, pBufferExist);
if (pBufferExist!=NULL)
{
pContext->m_csContext.Unlock();
Logging2(ERROR_IOCPMANAGER_MAP_BUFFER_EXIST, DOMAIN_NAME);
return pRet;
}
pContext->m_mapRecvBuffer[nSequenceNumber] = pBuffer;
}
pContext->m_mapRecvBuffer.Lookup(pContext->m_nCurrentRecvSequenceNumber, pRet);
if (pRet!=NULL)
pContext->m_mapRecvBuffer.RemoveKey(pContext->m_nCurrentRecvSequenceNumber);
pContext->m_csContext.Unlock();
return pRet;
}
void CIocpManager::IncreaseRecvSequenceNumber(CIocpContext* pContext)
{
if (pContext!=NULL)
{
pContext->m_csContext.Lock();
pContext->m_nCurrentRecvSequenceNumber = (pContext->m_nCurrentRecvSequenceNumber+1) % MAXSEQUENCENUMBER;
pContext->m_csContext.Unlock();
}
}
BOOL CIocpManager::AddTask(IOCPTASK& itTask)
{
BOOL bRet=FALSE;
m_csTask.Lock();
if (m_listTask.GetCount()<MAX_NUMBER_OF_TASK_LIST)
{
m_listTask.AddTail(itTask);
bRet = TRUE;
}
m_csTask.Unlock();
return bRet;
}
BOOL CIocpManager::GetTask(IOCPTASK& itTask)
{
BOOL bRet=FALSE;
m_csTask.Lock();
if (!m_listTask.IsEmpty())
{
itTask = m_listTask.RemoveHead();
bRet = TRUE;
}
m_csTask.Unlock();
return bRet;
}
void CIocpManager::RemoveTask(CIocpContext* pContext)
{
m_csTask.Lock();
for (POSITION pos=m_listTask.GetHeadPosition(); pos!=NULL;)
{
POSITION oldpos=pos;
IOCPTASK itTask=m_listTask.GetNext(pos);
if (itTask.pContext!=pContext && pContext!=NULL)
continue;
if (itTask.pContext!=NULL)
{
itTask.pContext->ReleasePacket(itTask.pPacket);
ReleaseContext(itTask.pContext);
m_listTask.RemoveAt(oldpos);
}
}
m_csTask.Unlock();
}
int CIocpManager::CreateTaskProcessThread()
{
int nErr=ERROR_NONE;
m_hStopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
ASSERT(m_hStopEvent!=NULL);
int nNumberOfTaskProcessThread=NUMBER_OF_TASK_PROCESS_THREAD;
DWORD dwThreadId=0;
for(int i=0; i<nNumberOfTaskProcessThread;i++)
{
HANDLE hThread=CreateThread(NULL, 0, CIocpManager::TaskProcessThread,
(LPVOID)this, THREAD_PRIORITY_NORMAL, &dwThreadId);
if (hThread==NULL)
{
nErr = ERROR_IOCPMANAGER_TASK_PROCESS_THREAD_CREATE;
Logging2(nErr, DOMAIN_NAME);
break;
}
CloseHandle(hThread);
}
return nErr;
}
DWORD CIocpManager::TaskProcessThread(LPVOID lpParam)
{
int nErr=ERROR_NONE;
CIocpManager* pThis=reinterpret_cast<CIocpManager*>(lpParam);
if (pThis==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
InterlockedIncrement(&pThis->m_lTaskProcessCount);
IOCPTASK itTask;
while (1)
{
if (WaitForSingleObject(pThis->m_hStopEvent, 100) == WAIT_OBJECT_0)
break;
if (pThis->GetTask(itTask))
{
CIocpContext* pContext=itTask.pContext;
pThis->Processing(itTask);
if (pContext!=NULL)
{
pContext->ReleasePacket(itTask.pPacket);
pThis->ReleaseContext(pContext);
}
}
}
InterlockedDecrement(&pThis->m_lTaskProcessCount);
Logging2(ERROR_IOCPMANAGER_TASK_PROCESS_THREAD_STOP, DOMAIN_NAME);
return nErr;
}
int CIocpManager::StopTaskProcessThread()
{
int nErr=ERROR_NONE;
SetEvent(m_hStopEvent);
while (m_lTaskProcessCount>0)
{
Sleep(100);
}
CloseHandle(m_hStopEvent);
m_hStopEvent = NULL;
//删除剩余任务
RemoveTask();
return nErr;
}
BOOL CIocpManager::AddWaitSend(IOCPWAITSEND& iwWaitSend)
{
BOOL bRet=FALSE;
m_csWaitSend.Lock();
if (m_aWaitSend.GetSize()<MAX_NUMBER_OF_WAIT_SEND_LIST)
{
m_aWaitSend.Add(iwWaitSend);
bRet = TRUE;
}
m_csWaitSend.Unlock();
return bRet;
}
void CIocpManager::ReleaseWaitEvent(HANDLE& hWaitEvent)
{
if (hWaitEvent!=NULL)
{
SetEvent(hWaitEvent);
CloseHandle(hWaitEvent);
hWaitEvent = NULL;
}
}
BOOL CIocpManager::GetWaitSend(GUID& gdPacket, IOCPWAITSEND& iwWaitSend, BOOL bRemove)
{
BOOL bRet=FALSE;
m_csWaitSend.Lock();
for (int i=0; i<m_aWaitSend.GetSize(); i++)
{
IOCPWAITSEND iws=m_aWaitSend[i];
if (iws.gdWait==gdPacket)
{
iwWaitSend = iws;
bRet = TRUE;
if (bRemove) m_aWaitSend.RemoveAt(i);
break;
}
}
m_csWaitSend.Unlock();
return bRet;
}
void CIocpManager::RemoveWaitSend(CIocpContext* pContext)
{
m_csWaitSend.Lock();
for (int i=0; i<m_aWaitSend.GetSize(); i++)
{
IOCPWAITSEND iws=m_aWaitSend[i];
if (iws.pContext!=pContext && pContext!=NULL)
continue;
if (iws.pWaitPacket!=NULL)
iws.pWaitPacket->GetPacketHeader().m_nErrCode = ERROR_IOCPMANAGER_DISCONNECT_CONTEXT;
if (iws.hWaitEvent!=NULL)
ReleaseWaitEvent(iws.hWaitEvent);
}
m_aWaitSend.RemoveAll();
m_csWaitSend.Unlock();
}
int CIocpManager::CancelSendData(int nSocket, ULONG ulKey)
{
int nErr=ERROR_NONE;
CIocpContext* pContext=FindContext(nSocket);
if (pContext==NULL)
{
nErr = ERROR_IOCPMANAGER_FIND_CONTEXT_NULL;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
return CancelSendData(pContext, ulKey);
}
int CIocpManager::CancelSendData(CIocpContext* pContext, ULONG ulKey)
{
int nErr=ERROR_NONE;
if (pContext==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
pContext->m_csContext.Lock();
CIocpPacket* pPacket=pContext->FindPacket(ulKey, PACKETTYPE_WRITE); //查找发送数据包
if (pPacket!=NULL)
{
IOCPWAITSEND iwWaitSend;
if (GetWaitSend(pPacket->GetPacketHeader().m_gdHeader, iwWaitSend))
{
if (iwWaitSend.pWaitPacket!=NULL) //设置出错信息
iwWaitSend.pWaitPacket->GetPacketHeader().m_nErrCode = ERROR_IOCPMANAGER_SEND_USER_CANCEL;
if (iwWaitSend.hWaitEvent!=NULL) //清除同步命令的等待事件
ReleaseWaitEvent(iwWaitSend.hWaitEvent);
}
}
else
{
CIocpPacket* pPacket=pContext->FindPacket(ulKey, PACKETTYPE_READ); //查找接收数据包
if (pPacket!=NULL)
{
//接收端取消则向发送端发送取消命令
CIocpPacketHeader pkHeader;
pkHeader.m_nCmd = CMD_IOCP_COMMON;
pkHeader.m_wParam = CMD_IOCP_COMMON_RECV_CANCEL;
pkHeader.m_lParam = pPacket->m_ulKey;
CIocpPacket pkCancel;
pkCancel.SetPacketHeader(pkHeader);
nErr = SendData(pContext, &pkCancel);
}
}
pContext->m_csContext.Unlock();
return nErr;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -