📄 iocpmanager.cpp
字号:
{
int nErr=ERROR_NONE;
CIocpContext* pContext=FindContext(nSocket);
if (pContext==NULL)
{
nErr = ERROR_IOCPMANAGER_FIND_CONTEXT_NULL;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
return SendData(pContext, pPacket, pAnsPacket, dwWaitTime);
}
int CIocpManager::SendData(CIocpContext* pContext, CIocpPacket* pPacket, CIocpPacket* pAnsPacket, DWORD dwWaitTime)
{
int nErr=ERROR_NONE;
if (pContext==NULL || pPacket==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
CIocpBuffer* pBuf=AllocateBuffer(IOTYPE_INIT_WRITE);
if (pBuf==NULL)
{
nErr = ERROR_IOCPMANAGER_ALLOCATE_BUFFER;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
pContext->m_csContext.Lock(); //对连接上下文对象加锁,避免在发送过程中对象被释放
if (pContext->m_soContext==INVALID_SOCKET)
{
pContext->m_csContext.Unlock();
nErr = ERROR_IOCPMANAGER_INVALID_SOCKET;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
CIocpPacket* pSendPacket=pContext->AllocatePacket(pContext->GenerateKey());
pPacket->Clone(pSendPacket);
HANDLE hWaitEvent=CreateEvent(NULL, TRUE, FALSE, NULL); //创建等待事件
GUID gdPacket=GUID_NULL;
//若GUID为空则赋值,不为空则为响应消息,不用赋值
if (pSendPacket->GetPacketHeader().m_gdHeader==GUID_NULL)
{
gdPacket = pContext->GenerateGuid(); //创建发送数据包的唯一标识
pSendPacket->GetPacketHeader().m_gdHeader = gdPacket;
}
else
{
gdPacket = pSendPacket->GetPacketHeader().m_gdHeader;
}
IOCPWAITSEND iwWaitSend;
iwWaitSend.gdWait = gdPacket;
iwWaitSend.hWaitEvent = hWaitEvent;
iwWaitSend.pContext = pContext;
iwWaitSend.pWaitPacket = pAnsPacket;
if (!AddWaitSend(iwWaitSend)) //添加同步命令到数组
{
ReleaseWaitEvent(hWaitEvent);
pContext->ReleasePacket(pSendPacket);
pContext->m_csContext.Unlock();
nErr = ERROR_IOCPMANAGER_SYNC_SEND_FAILED;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
pSendPacket->m_bSyncSend = (pAnsPacket!=NULL);
pSendPacket->InitSendData();
pBuf->SetPacket(pSendPacket);
IOCPTASK itTask;
itTask.wType = IOCPTASK_PACKET_BEGIN;
itTask.pContext = pContext;
itTask.pPacket = pSendPacket;
nErr = Processing(itTask);
nErr = PostWrite(pContext, pBuf);
pContext->m_csContext.Unlock();
if (!NOERROROCCUR(nErr)) return nErr;
if (pAnsPacket!=NULL) //同步命令等待
{
DWORD dwRet=WaitForSingleObject(hWaitEvent, dwWaitTime);
if (dwRet==WAIT_TIMEOUT) //等待超时
{
IOCPWAITSEND iwWaitSend;
if (GetWaitSend(gdPacket, iwWaitSend))
{
nErr = ERROR_IOCPMANAGER_SEND_WAIT_TIMEOUT;
if (iwWaitSend.pWaitPacket!=NULL)
iwWaitSend.pWaitPacket->GetPacketHeader().m_nErrCode = nErr;
if (iwWaitSend.hWaitEvent!=NULL)
ReleaseWaitEvent(iwWaitSend.hWaitEvent);
}
}
}
return nErr;
}
int CIocpManager::PostWrite(CIocpContext* pContext, CIocpBuffer* pBuf)
{
int nErr=ERROR_NONE;
if (pContext==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
if (pContext->m_soContext!=INVALID_SOCKET && pBuf!=NULL)
{
pBuf->ResetIoType(IOTYPE_INIT_WRITE);
pContext->EnterIoLoop();
SetSendSequenceNumber(pContext, pBuf);
int nLastError=0;
BOOL bRet = PostQueuedCompletionStatus(m_hCompletionPort, 0,
(DWORD)pContext, &pBuf->m_olBuffer);
if (!bRet && (nLastError=GetLastError())!=ERROR_IO_PENDING)
{
ReleaseBuffer(pBuf);
ReleaseContext(pContext);
nErr = ERROR_IOCPMANAGER_POSTQUEUEDCOMPLETIONSTATUS;
Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(nLastError),
DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);
return nErr;
}
}
else
{
ReleaseBuffer(pBuf);
nErr = ERROR_IOCPMANAGER_INVALID_SOCKET;
Logging2(nErr, DOMAIN_NAME);
}
return nErr;
}
int CIocpManager::OnInitWrite(CIocpContext* pContext, CIocpBuffer* pBuf)
{
int nErr=ERROR_NONE;
if (pContext==NULL ||
pContext->m_soContext==INVALID_SOCKET)
{
ReleaseBuffer(pBuf);
ReleaseContext(pContext);
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
pBuf = GetNextSendBuffer(pContext, pBuf);
while (pBuf!=NULL)
{
//判断当前发送数据包是否取消,一个发送数据包对应一个缓冲区
CIocpPacket* pSendPacket=pBuf->GetPacket();
if (pSendPacket!=NULL)
{
IOCPWAITSEND iwWaitSend;
if (!GetWaitSend(pSendPacket->GetPacketHeader().m_gdHeader, iwWaitSend, FALSE))
{
//通知数据发送已取消
IOCPTASK itTask;
itTask.wType = IOCPTASK_PACKET_CANCEL;
itTask.pContext = pContext;
itTask.pPacket = pSendPacket;
nErr = Processing(itTask);
//向接收端发送取消命令
CIocpPacketHeader pkHeader;
pkHeader.m_nCmd = CMD_IOCP_COMMON;
pkHeader.m_wParam = CMD_IOCP_COMMON_SEND_CANCEL;
pkHeader.m_lParam = pSendPacket->m_ulKey;
CIocpPacket pkCancel;
pkCancel.SetPacketHeader(pkHeader);
nErr = SendData(pContext, &pkCancel);
ReleaseBuffer(pBuf);
pContext->ReleasePacket(pSendPacket);
IncreaseSendSequenceNumber(pContext);
pBuf = GetNextSendBuffer(pContext);
nErr = ERROR_IOCPMANAGER_SEND_USER_CANCEL;
Logging2(nErr, DOMAIN_NAME);
continue; //继续处理下一个数据包
}
}
pBuf->ResetIoType(IOTYPE_WRITE);
pBuf->SetupBuffer(IOTYPE_WRITE);
DWORD dwIoSize=0;
ULONG ulFlags=MSG_PARTIAL;
int nWSARet=0;
int nRet=WSASend(pContext->m_soContext, pBuf->GetWSABuffer(), 1,
&dwIoSize, ulFlags, &pBuf->m_olBuffer, NULL);
if (nRet==SOCKET_ERROR && (nWSARet=WSAGetLastError())!=WSA_IO_PENDING)
{
ReleaseBuffer(pBuf);
DisconnectContext(pContext);
IncreaseSendSequenceNumber(pContext);
pBuf = GetNextSendBuffer(pContext);
ReleaseContext(pContext);
nErr = ERROR_IOCPMANAGER_WSASEND_FAILED;
Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(nWSARet),
DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);
}
else
{
IncreaseSendSequenceNumber(pContext);
pBuf = GetNextSendBuffer(pContext);
}
}
return nErr;
}
int CIocpManager::OnWrite(CIocpContext* pContext, CIocpBuffer* pBuf, DWORD dwIoSize)
{
int nErr=ERROR_NONE;
if (pContext==NULL)
{
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
if (pBuf!=NULL)
{
BOOL bSendOver=pBuf->SendBufferOver(dwIoSize);
if (bSendOver)
{
CIocpPacket* pPacket=pBuf->GetPacket();
pPacket->ClosePacketFile();
IOCPTASK itTask;
itTask.wType = IOCPTASK_PACKET_END;
itTask.pContext = pContext;
itTask.pPacket = pPacket;
nErr = Processing(itTask);
if (!pPacket->m_bSyncSend) //若是异步发送数据则发完后将等待数据从数组中删除
{
IOCPWAITSEND iwWaitSend;
if (GetWaitSend(pPacket->GetPacketHeader().m_gdHeader, iwWaitSend))
ReleaseWaitEvent(iwWaitSend.hWaitEvent);
}
ReleaseBuffer(pBuf);
pContext->ReleasePacket(pPacket);
}
else
{
int nTranLen=0, nDoneLen=0;
CIocpPacket* pPacket=pBuf->GetPacket();
if (pPacket!=NULL && pPacket->NeedInfoProgress(nTranLen, nDoneLen))
{
IOCPTASK itTask;
itTask.wType = IOCPTASK_PACKET_PROGRESS;
itTask.pContext = pContext;
itTask.pPacket = pPacket;
itTask.wParam = nTranLen;
itTask.lParam = nDoneLen;
nErr = Processing(itTask);
}
nErr = PostWrite(pContext, pBuf);
}
}
ReleaseContext(pContext);
return nErr;
}
int CIocpManager::Connect(CString strServerAddr, int nServerPort)
{
int nErr=ERROR_NONE;
SOCKET soSocket=WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if (soSocket==INVALID_SOCKET)
{
nErr = ERROR_IOCPMANAGER_CREATE_SOCKET;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
SOCKADDR_IN saServer;
memset(&saServer, 0, sizeof(saServer));
saServer.sin_family = AF_INET;
saServer.sin_addr.s_addr = inet_addr(strServerAddr);
saServer.sin_port = htons(nServerPort);
int nRet=connect(soSocket, (sockaddr*)&saServer, sizeof(saServer));
if (nRet==SOCKET_ERROR && WSAGetLastError()!=WSAEWOULDBLOCK)
{
closesocket(soSocket);
nErr = ERROR_IOCPMANAGER_CONNECT_FAILED;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
return AssociateSocketWithContext(soSocket, NULL, &saServer);
}
int CIocpManager::DisconnectAllContext()
{
int nErr=ERROR_NONE;
m_csUsedContext.Lock();
for (POSITION pos=m_plUsedContext.GetHeadPosition(); pos!=NULL;)
{
CIocpContext* pContext=(CIocpContext*)m_plUsedContext.GetNext(pos);
if (pContext!=NULL)
{
nErr = DisconnectContext(pContext);
}
}
m_csUsedContext.Unlock();
return nErr;
}
int CIocpManager::StopIocpWorkerThread()
{
int nErr=ERROR_NONE;
const DWORD dwWait=100;
while (m_lIocpWorkerCount>0)
{
//通知处理线程停止工作
PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD)NULL, NULL);
Sleep(dwWait);
}
return nErr;
}
void CIocpManager::FreeBuffer()
{
m_csFreeBuf.Lock();
for (POSITION pos=m_plFreeBuffer.GetHeadPosition(); pos!=NULL;)
{
CIocpBuffer* pBuf=(CIocpBuffer*)m_plFreeBuffer.GetNext(pos);
if (pBuf!=NULL) delete pBuf;
}
m_plFreeBuffer.RemoveAll();
m_csFreeBuf.Unlock();
m_csUsedBuf.Lock();
for (pos=m_plUsedBuffer.GetHeadPosition(); pos!=NULL;)
{
CIocpBuffer* pBuf=(CIocpBuffer*)m_plUsedBuffer.GetNext(pos);
if (pBuf!=NULL) delete pBuf;
}
m_plUsedBuffer.RemoveAll();
m_csUsedBuf.Unlock();
}
void CIocpManager::FreeContext()
{
m_csUsedContext.Lock();
for (POSITION pos=m_plUsedContext.GetHeadPosition(); pos!=NULL;)
{
CIocpContext* pContext=(CIocpContext*)m_plUsedContext.GetNext(pos);
if (pContext!=NULL)
{
if (pContext->m_soContext!=INVALID_SOCKET)
CloseSocket(pContext->m_soContext);
ReleaseBufferMap(pContext->m_mapSendBuffer);
ReleaseBufferMap(pContext->m_mapRecvBuffer);
pContext->FreePacket();
delete pContext;
}
}
m_plUsedContext.RemoveAll();
m_csUsedContext.Unlock();
m_csFreeContext.Lock();
for (pos=m_plFreeContext.GetHeadPosition(); pos!=NULL;)
{
CIocpContext* pContext=(CIocpContext*)m_plFreeContext.GetNext(pos);
if (pContext!=NULL) delete pContext;
}
m_plFreeContext.RemoveAll();
m_csFreeContext.Unlock();
}
int CIocpManager::AssociateSocketWithContext(SOCKET soSocket, CIocpBuffer* pBuf, SOCKADDR_IN* pRemoveAddr)
{
int nErr=ERROR_NONE;
if (soSocket==INVALID_SOCKET)
{
ReleaseBuffer(pBuf);
nErr = ERROR_FUNC_PARAM_INVALID;
return nErr;
}
//判断连接是否超出上限
m_csUsedContext.Lock();
BOOL bReachMaxConnection=(m_plUsedContext.GetCount()>=MAX_NUMBER_OF_CONNECTION);
m_csUsedContext.Unlock();
if (bReachMaxConnection)
{
CloseSocket(soSocket);
ReleaseBuffer(pBuf);
nErr = ERROR_IOCPMANAGER_REACH_MAX_CONNECTION;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
CIocpContext* pContext=AllocateContext();
if (pContext==NULL)
{
closesocket(soSocket);
ReleaseBuffer(pBuf);
nErr = ERROR_IOCPMANAGER_ALLOCATE_CONTEXT;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
if (pRemoveAddr!=NULL)
memcpy(&(pContext->m_saRemote), pRemoveAddr, sizeof(SOCKADDR_IN));
pContext->m_soContext = soSocket;
/*
* TCP_NODELAY BOOL=TRUE Disables the "nagle algorithm for send coalescing" which delays
* short packets in the hope that the application will send more data and allow
* it to combine them into a single one to improve network efficiency.
*/
const char chOpt = 1;
int nRet = setsockopt(pContext->m_soContext, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
if (nRet==SOCKET_ERROR)
{
ReleaseBuffer(pBuf);
DisconnectContext(pContext);
ReleaseContext(pContext);
nErr = ERROR_IOCPMANAGER_SETSOCKOPT_FAILED;
Logging2(nErr, DOMAIN_NAME);
return nErr;
}
nErr = AssociateSocketWithCompletionPort(pContext->m_soContext,
m_hCompletionPort, (DWORD)pContext);
if (!NOERROROCCUR(nErr))
{
ReleaseBuffer(pBuf);
DisconnectContext(pContext);
ReleaseContext(pContext);
return nErr;
}
pContext->EnterIoLoop();
if (pBuf==NULL)
{
pBuf = AllocateBuffer(IOTYPE_INITIALIZE);
}
pBuf->ResetIoType(IOTYPE_INITIALIZE);
int nLastError=0;
BOOL bRet = PostQueuedCompletionStatus(m_hCompletionPort, 0,
(DWORD)pContext, &pBuf->m_olBuffer);
if (!bRet && (nLastError=GetLastError())!=ERROR_IO_PENDING)
{
ReleaseBuffer(pBuf);
DisconnectContext(pContext);
ReleaseContext(pContext);
nErr = ERROR_IOCPMANAGER_POSTQUEUEDCOMPLETIONSTATUS;
Logging4(ERRORLEVEL_ERROR, CErrorManager::Instance()->WSAErrorCodeToText(nLastError),
DOMAIN_NAME, CLASS_NAME_IOCPMANAGER);
return nErr;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -