📄 iocpmanager.cpp
字号:
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
int nNumberofPendingIO=pIocpContext->ExitIoLoop();
if (nNumberofPendingIO<=0)
{
//从使用缓冲区数组中删除
pIocpContext->m_csContext.Lock();
m_csUsedContext.Lock();
POSITION pos=pIocpContext->GetPosition();
if (pos!=NULL) m_plUsedContext.RemoveAt(pos);
m_csUsedContext.Unlock();
pIocpContext->m_bufPackage.ResetBuffer();
pIocpContext->m_nSendSequenceNumber = pIocpContext->m_nCurrentSendSequenceNumber = 0;
pIocpContext->m_nRecvSequenceNumber = pIocpContext->m_nCurrentRecvSequenceNumber = 0;
ReleaseBufferMap(pIocpContext->m_mapSendBuffer);
ReleaseBufferMap(pIocpContext->m_mapRecvBuffer);
pIocpContext->FreePacket();
pIocpContext->m_pIocpMngr = NULL;
pIocpContext->m_csContext.Unlock();
//添加到空闲缓冲区数组中
m_csFreeContext.Lock();
if (m_plFreeContext.GetCount()<MAX_NUMBER_OF_FREE_CONTEXT_LIST)
{
pos = m_plFreeContext.AddHead(pIocpContext);
pIocpContext->SetPosition(NULL);
}
else
{
if (pIocpContext!=NULL) delete pIocpContext;
}
m_csFreeContext.Unlock();
Logging2(ERROR_IOCPMANAGER_DISCONNECT_CONTEXT, DOMAIN_NAME);
}
return nErr;
}
int CIocpManager::ReleaseBuffer(CIocpBuffer* pBuf)
{
int nErr=ERROR_NONE;
if (pBuf==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
//从使用缓冲区数组中删除
m_csUsedBuf.Lock();
POSITION pos=pBuf->GetPosition();
if (pos!=NULL) m_plUsedBuffer.RemoveAt(pos);
m_csUsedBuf.Unlock();
//添加到空闲缓冲区数组中
m_csFreeBuf.Lock();
if (m_plUsedBuffer.GetCount()<MAX_NUMBER_OF_FREE_BUFFER_LIST)
{
pos = m_plFreeBuffer.AddHead(pBuf);
pBuf->SetPosition(NULL);
}
else
{
if (pBuf!=NULL) delete pBuf;
}
m_csFreeBuf.Unlock();
return nErr;
}
int CIocpManager::ProcessIocpMessage(CIocpContext* pIocpContext, CIocpBuffer* pOverlappedBuf, DWORD dwIoSize)
{
int nErr = ERROR_NONE;
if(pOverlappedBuf==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
switch (pOverlappedBuf->GetIoType())
{
case IOTYPE_INITIALIZE:
nErr = OnInitialize(pIocpContext, pOverlappedBuf);
break;
case IOTYPE_ACCEPTEX:
nErr = OnAcceptEx(pOverlappedBuf);
break;
case IOTYPE_INIT_READ:
nErr = OnInitRead(pIocpContext, pOverlappedBuf);
break;
case IOTYPE_READ:
nErr = OnRead(pIocpContext, pOverlappedBuf, dwIoSize);
break;
case IOTYPE_INIT_WRITE:
nErr = OnInitWrite(pIocpContext, pOverlappedBuf);
break;
case IOTYPE_WRITE:
nErr = OnWrite(pIocpContext, pOverlappedBuf, dwIoSize);
break;
default:
nErr = ReleaseBuffer(pOverlappedBuf);
break;
}
return nErr;
}
CIocpBuffer* CIocpManager::AllocateBuffer(int nIoType)
{
CIocpBuffer* pRet=NULL;
//先从空闲缓冲区数组中获取
m_csFreeBuf.Lock();
if (!m_plFreeBuffer.IsEmpty())
{
pRet = (CIocpBuffer*)m_plFreeBuffer.RemoveHead();
}
m_csFreeBuf.Unlock();
//新建缓冲区
if (pRet==NULL)
{
pRet = new CIocpBuffer();
}
//初始化新建缓冲区,并加入到使用缓冲区数组中
if (pRet!=NULL)
{
pRet->ResetBuffer();
pRet->ResetIoType(nIoType);
pRet->m_nSequenceNumber = 0;
pRet->m_nSendLen = 0;
m_csUsedBuf.Lock();
POSITION pos=m_plUsedBuffer.AddHead(pRet);
pRet->SetPosition(pos);
m_csUsedBuf.Unlock();
}
return pRet;
}
CIocpContext* CIocpManager::AllocateContext()
{
CIocpContext* pRet=NULL;
m_csFreeContext.Lock();
if (!m_plFreeContext.IsEmpty())
{
pRet = (CIocpContext*)m_plFreeContext.RemoveHead();
}
m_csFreeContext.Unlock();
if (pRet==NULL)
{
pRet = new CIocpContext();
}
if (pRet!=NULL)
{
m_csUsedContext.Lock();
POSITION pos=m_plUsedContext.AddHead(pRet);
pRet->m_csContext.Lock();
pRet->m_pIocpMngr = this;
pRet->SetPosition(pos);
pRet->m_soContext = INVALID_SOCKET;
pRet->m_nNumberOfPengingIo = 0;
memset(&(pRet->m_saRemote), 0, sizeof(SOCKADDR_IN));
pRet->m_bufPackage.ResetBuffer();
pRet->m_nSendSequenceNumber = pRet->m_nCurrentSendSequenceNumber = 0;
pRet->m_nRecvSequenceNumber = pRet->m_nCurrentRecvSequenceNumber = 0;
ReleaseBufferMap(pRet->m_mapSendBuffer);
ReleaseBufferMap(pRet->m_mapRecvBuffer);
pRet->FreePacket();
pRet->m_csContext.Unlock();
m_csUsedContext.Unlock();
}
return pRet;
}
int CIocpManager::OnAcceptEx(CIocpBuffer* pBuf)
{
int nErr=ERROR_NONE;
InterlockedDecrement(&m_nNumberOfAcceptEx);
if (pBuf==NULL || pBuf->m_soSocket==INVALID_SOCKET)
{
ReleaseBuffer(pBuf);
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
// 使SOCKET具有自动监听的功能
int nRet = setsockopt(pBuf->m_soSocket, SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT, (char*)&m_soListen, sizeof(m_soListen));
if (nRet==SOCKET_ERROR)
{
ReleaseBuffer(pBuf);
nErr = ERROR_IOCPMANAGER_SETSOCKOPT_FAILED;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
//获得本地及远程机器IP地址信息
SOCKADDR_IN *pLocalAddr=NULL;
SOCKADDR_IN *pRemoteAddr=NULL;
int nLen=sizeof(SOCKADDR);
m_lpGetAcceptExSockAddrs(pBuf->GetBuffer(), 0,
sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16,
(SOCKADDR**)&pLocalAddr, &nLen,
(SOCKADDR**)&pRemoteAddr, &nLen);
nErr = AssociateSocketWithContext(pBuf->m_soSocket, pBuf, pRemoteAddr);
if (!NOERROROCCUR(nErr)) return nErr;
nErr = PostAcceptEx(); //保持AcceptEx请求的数量
return nErr;
}
int CIocpManager::PostRead(CIocpContext* pContext, CIocpBuffer* pBuf)
{
int nErr=ERROR_NONE;
if (pContext==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
if (pContext->m_soContext==INVALID_SOCKET)
{
ReleaseBuffer(pBuf);
ReleaseContext(pContext);
nErr = ERROR_IOCPMANAGER_INVALID_SOCKET;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
if (pBuf==NULL)
{
pBuf=AllocateBuffer(IOTYPE_INIT_READ);
}
pBuf->ResetIoType(IOTYPE_INIT_READ);
int nLastError=0;
BOOL bRet = PostQueuedCompletionStatus(m_hCompletionPort, 0,
(DWORD)pContext, &pBuf->m_olBuffer);
if (!bRet && (nLastError=GetLastError())!=ERROR_IO_PENDING)
{
ReleaseBuffer(pBuf);
DisconnectContext(pContext);
ReleaseContext(pContext);
nErr = ERROR_IOCPMANAGER_POSTQUEUEDCOMPLETIONSTATUS;
Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(nLastError),
DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);
}
return nErr;
}
int CIocpManager::OnInitRead(CIocpContext* pContext, CIocpBuffer* pBuf)
{
int nErr=ERROR_NONE;
if (pContext==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
if (pBuf==NULL)
{
pBuf = AllocateBuffer(IOTYPE_READ);
if (pBuf==NULL)
{
ReleaseContext(pContext);
nErr = ERROR_IOCPMANAGER_ALLOCATE_BUFFER;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
}
pBuf->ResetIoType(IOTYPE_READ);
pBuf->SetupBuffer(IOTYPE_READ);
pContext->m_csContext.Lock();
pBuf->m_nSequenceNumber = pContext->m_nRecvSequenceNumber;
int nWSARet=0;
DWORD dwIoSize=0;
ULONG ulFlags=MSG_PARTIAL;
UINT nRet=WSARecv(pContext->m_soContext, pBuf->GetWSABuffer(), 1,
&dwIoSize, &ulFlags, &pBuf->m_olBuffer, NULL);
if (nRet==SOCKET_ERROR && (nWSARet=WSAGetLastError())!=WSA_IO_PENDING)
{
pContext->m_csContext.Unlock();
ReleaseBuffer(pBuf);
DisconnectContext(pContext);
ReleaseContext(pContext);
nErr = ERROR_IOCPMANAGER_WSARECV_FAILED;
Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(nWSARet),
DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);
}
else
{
pContext->m_nRecvSequenceNumber = (pContext->m_nRecvSequenceNumber+1) % MAXSEQUENCENUMBER;
pContext->m_csContext.Unlock();
}
return nErr;
}
int CIocpManager::OnRead(CIocpContext* pContext, CIocpBuffer* pBuf, DWORD dwIoSize)
{
int nErr=ERROR_NONE;
if (pContext==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
if (pBuf==NULL || dwIoSize==0)
{
ReleaseBuffer(pBuf);
DisconnectContext(pContext);
ReleaseContext(pContext);
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
//按顺序读取数据
pBuf->SetBufferLen(dwIoSize);
pBuf = GetNextRecvBuffer(pContext, pBuf);
while (pBuf!=NULL)
{
nErr = ProcessBuffer(pContext, pBuf);
IncreaseRecvSequenceNumber(pContext);
pBuf = GetNextRecvBuffer(pContext);
}
nErr = PostRead(pContext);
return nErr;
}
int CIocpManager::ProcessBuffer(CIocpContext* pContext, CIocpBuffer* pBuf)
{
ASSERT(pContext!=NULL && pBuf!=NULL);
int nErr=ERROR_NONE;
CIocpBuffer* pProcessBuffer=pBuf;
//若临时缓冲区中有数据
int nBufferLen=pBuf->GetBufferLen();
int nPackageLen=pContext->m_bufPackage.GetBufferLen();
if (nPackageLen>0)
{
int nNeedLen=(MAXBUFLEN-nPackageLen);
if (nBufferLen<nNeedLen)
{
//若总数据长度仍不足MAXBUFLEN,则将数据拷入临时缓冲区并返回
pContext->m_bufPackage.AddBufferData(pBuf->GetBuffer(), nBufferLen);
ReleaseBuffer(pBuf);
return nErr;
}
else
{
//将数据加入临时缓冲区
pContext->m_bufPackage.AddBufferData(pBuf->GetBuffer(), nNeedLen);
pBuf->FlushBuffer(nNeedLen);
pProcessBuffer = &(pContext->m_bufPackage);
}
}
if (pProcessBuffer!=NULL && pProcessBuffer->GetBufferLen()>=MAXBUFLEN)
{
//缓冲区有足够数据,进行处理
ULONG ulKey=pProcessBuffer->GetKey()+1; //保证了接收收的标识为双数,与发送包区分开
CIocpPacket* pPacket=pContext->FindPacket(ulKey);
if (pPacket==NULL)
{
pPacket = pContext->AllocatePacket(ulKey, PACKETTYPE_READ);
pPacket->SetDefaultPath(m_strDefaultPath); //设置接收数据包的临时文件默认目录
IOCPTASK itTask;
itTask.wType = IOCPTASK_PACKET_BEGIN;
itTask.pContext = pContext;
itTask.pPacket = pPacket;
nErr = Processing(itTask);
}
nErr = pProcessBuffer->ProcessBuffer(pPacket);
if (!pPacket->IsPacketValid()) //无效的数据包
{
ReleaseBuffer(pBuf);
pContext->ReleasePacket(pPacket);
DisconnectContext(pContext);
nErr = ERROR_IOCPMANAGER_INVALID_PACKET;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
if (NOERROROCCUR(nErr))
{
if (pPacket->IsReady())
{
pPacket->ClosePacketFile();
IOCPTASK itTask;
itTask.wType = IOCPTASK_PACKET_END;
itTask.pContext = pContext;
itTask.pPacket = pPacket;
nErr = Processing(itTask);
//添加任务到数组中
pContext->EnterIoLoop();
IOCPTASK itTaskPacket;
itTaskPacket.wType = IOCPTASK_PACKET_PROCESS;
itTaskPacket.pContext = pContext;
itTaskPacket.pPacket = pPacket;
if (!AddTask(itTaskPacket))
{
pContext->ReleasePacket(pPacket);
ReleaseContext(pContext);
}
}
else
{
int nTranLen=0, nDoneLen=0;
if (pPacket->NeedInfoProgress(nTranLen, nDoneLen))
{
IOCPTASK itTask;
itTask.wType = IOCPTASK_PACKET_PROGRESS;
itTask.pContext = pContext;
itTask.pPacket = pPacket;
itTask.wParam = nTranLen;
itTask.lParam = nDoneLen;
nErr = Processing(itTask);
}
}
}
}
if (pBuf->GetBufferLen()>0) //若仍有多余数据则加入到临时缓冲区中
pContext->m_bufPackage.AddBufferData(pBuf->GetBuffer(), pBuf->GetBufferLen());
ReleaseBuffer(pBuf);
return nErr;
}
CIocpContext* CIocpManager::FindContext(int nSocket)
{
CIocpContext* pRet=NULL;
m_csUsedContext.Lock();
for (POSITION pos=m_plUsedContext.GetHeadPosition(); pos!=NULL;)
{
CIocpContext* pContext=(CIocpContext*)m_plUsedContext.GetNext(pos);
if (pContext!=NULL && pContext->m_soContext==(SOCKET)nSocket)
{
pRet = pContext;
break;
}
}
m_csUsedContext.Unlock();
return pRet;
}
int CIocpManager::SendData(int nSocket, CIocpPacket* pPacket, CIocpPacket* pAnsPacket, DWORD dwWaitTime)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -