📄 iocpserver.cpp
字号:
bStayInPool = FALSE;
// }
bError = true;
}
}
}
//////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////////////
if (!bError)
{
if(bIORet && NULL != pOverlapPlus && NULL != lpClientContext)
{
bEnterRead = pThis->ProcessIOMessage(pOverlapPlus->m_ioType, lpClientContext, dwIoSize);
}
}
if(! bError && bEnterRead)
{
// issue a read request
OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IORead);
ULONG ulFlags = 0;
UINT nRetVal = WSARecv(lpClientContext->m_Socket,
&lpClientContext->m_wsaInBuffer,
1,
&dwIoSize,
&ulFlags,
&pOverlap->m_ol,
NULL);
if ( nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
DWORD dwErr = WSAGetLastError();
pThis->RemoveStaleClient( lpClientContext, FALSE );
}
}
if(pOverlapPlus)
delete pOverlapPlus; // from previous call
}
InterlockedDecrement(&pThis->m_nWorkerCnt);
InterlockedDecrement(&pThis->m_nCurrentThreads);
InterlockedDecrement(&pThis->m_nBusyThreads);
return 0;
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::Stop
//
// DESCRIPTION: Signal the listener to quit his thread
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
//
////////////////////////////////////////////////////////////////////////////////
void CIOCPServer::Stop()
{
::SetEvent(m_hKillEvent);
WaitForSingleObject(m_hThread, INFINITE);
CloseHandle(m_hThread);
CloseHandle(m_hKillEvent);
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::GetHostName
//
// DESCRIPTION: Get the host name of the connect client
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
//
////////////////////////////////////////////////////////////////////////////////
CString CIOCPServer::GetHostName(SOCKET socket)
{
sockaddr_in sockAddr;
memset(&sockAddr, 0, sizeof(sockAddr));
int nSockAddrLen = sizeof(sockAddr);
BOOL bResult = getpeername(socket,(SOCKADDR*)&sockAddr, &nSockAddrLen);
return bResult != INVALID_SOCKET ? inet_ntoa(sockAddr.sin_addr) : "";
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::Send
//
// DESCRIPTION: Posts a Write + Data to IO CompletionPort for transfer
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
// Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
////////////////////////////////////////////////////////////////////////////////
void CIOCPServer::Send(const CString& strClient, CString strData)
{
ClientContext* pContext = FindClient(strClient);
if (pContext == NULL)
return;
int nBufLen = strData.GetLength();
// 4 byte header [Size of Entire Packet]
pContext->m_WriteBuffer.Write((PBYTE) &nBufLen, sizeof(nBufLen));
pContext->m_WriteBuffer.Write((PBYTE) strData.GetBuffer(nBufLen), nBufLen);
// Wait for Data Ready signal to become available
// WaitForSingleObject(pContext->m_hWriteComplete, INFINITE);
// Prepare Packet
int nSize = pContext->m_WriteBuffer.GetBufferLen();
// pContext->m_wsaOutBuffer.buf = (CHAR*) new BYTE[nSize];
// pContext->m_wsaOutBuffer.len = nSize;
OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IOWrite);
PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) pContext, &pOverlap->m_ol);
pContext->m_nMsgOut++;
}
void CIOCPServer::Send(ClientContext* pContext, PBYTE pbyData, const long nSize)
{
pContext->m_WriteBuffer.Write((PBYTE) pbyData, nSize);
// Prepare Packet
int pkSize = pContext->m_WriteBuffer.GetBufferLen();
pContext->m_wsaOutBuffer.buf = (CHAR*) new BYTE[pkSize];
pContext->m_wsaOutBuffer.len = pkSize;
OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IOWrite);
PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) pContext, &pOverlap->m_ol);
pContext->m_nMsgOut++;
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CClientListener::OnClientInitializing
//
// DESCRIPTION: Called when client is initailizing
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
// Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
////////////////////////////////////////////////////////////////////////////////
bool CIOCPServer::OnClientInitializing(ClientContext* pContext, DWORD dwIoSize)
{
// We are not actually doing anything here, but we could for instance make
// a call to Send() to send a greeting message or something
return true; // make sure to issue a read after this
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::OnClientReading
//
// DESCRIPTION: Called when client is reading
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
// Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
// Igor Janjetovic 12122001 Fixed Echo drop problem
////////////////////////////////////////////////////////////////////////////////
bool CIOCPServer::OnClientReading(ClientContext* pContext, DWORD dwIoSize)
{
CLock cs(CIOCPServer::m_cs, "OnClientReading");
if (dwIoSize == 0)
{
RemoveStaleClient( pContext, FALSE );
return false;
}
// Add the message to out message
// Dont forget there could be a partial, 1, 1 or more + partial mesages
pContext->m_ReadBuffer.Write(pContext->m_byInBuffer,dwIoSize);
// Check real Data
while (pContext->m_ReadBuffer.GetBufferLen() > HDR_SIZE)
{
BYTE hdr_pk[4];
int nSize = 0;
int nCommand = 0;
CopyMemory(hdr_pk, pContext->m_ReadBuffer.GetBuffer(), sizeof(int));
nSize = readInt(hdr_pk);
if (nSize && pContext->m_ReadBuffer.GetBufferLen() >= nSize)
{
// Read off header
pContext->m_ReadBuffer.Read((PBYTE) hdr_pk, sizeof(int));
////////////////////////////////////////////////////////
////////////////////////////////////////////////////////
// SO you would process your data here
//
// I'm just going to post message so we can see the data
PBYTE pData = new BYTE[nSize-4];
pContext->m_ReadBuffer.Read(pData,nSize-4);
//here we got a complete message to process
TRACE("Got a message in OnClientReading and going to process\n");
//route the message to smpplibtest to handle
m_pSmppLibTest->parsePacket(pContext, pData, nSize);
// Clean Up
delete pData;
}
else
break;
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::OnClientWriting
//
// DESCRIPTION: Called when client is writing
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
// Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
// John Dresher 30022002 Changes for OVERLAPPEDPLUS
////////////////////////////////////////////////////////////////////////////////
bool CIOCPServer::OnClientWriting(ClientContext* pContext, DWORD dwIoSize)
{
ULONG ulFlags = MSG_PARTIAL;
TCHAR buf[100];
sprintf(buf, "OnClientWriting, dwIoSize is : %d", static_cast<unsigned long>(dwIoSize));
TRACE(buf);
// if ( dwIoSize == 0 )
// return true;
// Finished writing - tidy up
pContext->m_WriteBuffer.Delete(dwIoSize);
if (pContext->m_WriteBuffer.GetBufferLen() == 0)
{
pContext->m_WriteBuffer.ClearBuffer();
// Write complete
// SetEvent(pContext->m_hWriteComplete);
return true; // issue new read after this one
}
else
{
OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IOWrite);
pContext->m_wsaOutBuffer.buf = (char*) pContext->m_WriteBuffer.GetBuffer();
pContext->m_wsaOutBuffer.len = pContext->m_WriteBuffer.GetBufferLen();
int nRetVal = WSASend(pContext->m_Socket,
&pContext->m_wsaOutBuffer,
1,
&pContext->m_wsaOutBuffer.len,
ulFlags,
&pOverlap->m_ol,
NULL);
TRACE("Writing\n");
if ( nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING )
RemoveStaleClient( pContext, FALSE );
}
return false; // issue new read after this one
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::CloseCompletionPort
//
// DESCRIPTION: Close down the IO Complete Port, queue and associated client context structs
// which in turn will close the sockets...
//
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
//
////////////////////////////////////////////////////////////////////////////////
void CIOCPServer::CloseCompletionPort()
{
while (m_nWorkerCnt)
{
PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) NULL, NULL);
Sleep(1000);
}
// Close the CompletionPort and stop any more requests
CloseHandle(m_hCompletionPort);
CString strHost;
ClientContext* pContext = NULL;
do
{
POSITION pos = m_listContexts.GetStartPosition();
if (pos)
{
m_listContexts.GetNextAssoc(pos, strHost, pContext);
RemoveStaleClient(pContext, FALSE);
}
}
while (!m_listContexts.IsEmpty());
POSITION pos = m_listContexts.GetStartPosition();
while (pos)
{
m_listContexts.GetNextAssoc(pos, strHost, pContext);
delete pContext;
}
}
BOOL CIOCPServer::AssociateSocketWithCompletionPort(SOCKET socket, HANDLE hCompletionPort, DWORD dwCompletionKey)
{
HANDLE h = CreateIoCompletionPort((HANDLE) socket, hCompletionPort, dwCompletionKey, 0);
return h == hCompletionPort;
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::RemoveStaleClient
//
// DESCRIPTION: Client has died on us, close socket and remove context from our list
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
//
////////////////////////////////////////////////////////////////////////////////
void CIOCPServer::RemoveStaleClient(ClientContext* pContext, BOOL bGraceful)
{
CLock cs(m_cs, "RemoveStaleClient");
TRACE("CIOCPServer::RemoveStaleClient\n");
LINGER lingerStruct;
CString strHost = GetHostName(pContext->m_Socket);
if (strHost.IsEmpty())
return;
//
// If we're supposed to abort the connection, set the linger value
// on the socket to 0.
//
if ( !bGraceful )
{
lingerStruct.l_onoff = 1;
lingerStruct.l_linger = 0;
setsockopt( pContext->m_Socket, SOL_SOCKET, SO_LINGER,
(char *)&lingerStruct, sizeof(lingerStruct) );
}
//
// Free context structures
if (m_listContexts.Lookup(strHost, pContext))
{
//
// Now close the socket handle. This will do an abortive or graceful close, as requested.
CancelIo((HANDLE) pContext->m_Socket);
closesocket( pContext->m_Socket );
pContext->m_Socket = INVALID_SOCKET;
while (!HasOverlappedIoCompleted((LPOVERLAPPED)pContext))
Sleep(0);
MoveToFreePool(strHost);
}
}
void CIOCPServer::Shutdown()
{
if (m_bInit == false)
return;
m_bInit = false;
m_bTimeToKill = true;
// Stop the listener
Stop();
closesocket(m_socListen);
WSACloseEvent(m_hEvent);
CloseCompletionPort();
DeleteCriticalSection(&m_cs);
WSACleanup();
while (!m_listFreePool.IsEmpty())
delete m_listFreePool.RemoveTail();
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::MoveToFreePool
//
// DESCRIPTION: Checks free pool otherwise allocates a context
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
//
////////////////////////////////////////////////////////////////////////////////
void CIOCPServer::MoveToFreePool(CString& strKey)
{
ClientContext* pContext = NULL;
// Free context structures
if (m_listContexts.Lookup(strKey, pContext))
{
pContext->m_ReadBuffer.ClearBuffer();
pContext->m_WriteBuffer.ClearBuffer();
m_listFreePool.AddTail(pContext);
m_listContexts.RemoveKey(strKey);
}
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::MoveToFreePool
//
// DESCRIPTION: Moves an 'used/stale' Context to the free pool for reuse
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
//
////////////////////////////////////////////////////////////////////////////////
ClientContext* CIOCPServer::AllocateContext()
{
ClientContext* pContext = NULL;
CLock cs(CIOCPServer::m_cs, "AllocateContext");
if (!m_listFreePool.IsEmpty())
{
pContext = m_listFreePool.RemoveHead();
}
else
{
pContext = new ClientContext;
}
ASSERT(pContext);
ZeroMemory(pContext, sizeof(ClientContext));
return pContext;
}
void CIOCPServer::ResetConnection(ClientContext* pContext)
{
CString strHost;
ClientContext* pCompContext = NULL;
CLock cs(CIOCPServer::m_cs, "ResetConnection");
POSITION pos = m_listContexts.GetStartPosition();
while (pos)
{
m_listContexts.GetNextAssoc(pos, strHost, pCompContext);
if (pCompContext == pContext)
{
RemoveStaleClient(pContext, TRUE);
break;
}
}
}
void CIOCPServer::DisconnectAll()
{
m_bDisconnectAll = true;
CString strHost;
ClientContext* pContext = NULL;
CLock cs(CIOCPServer::m_cs, "ResetConnection");
POSITION pos = m_listContexts.GetStartPosition();
while (pos)
{
m_listContexts.GetNextAssoc(pos, strHost, pContext);
RemoveStaleClient(pContext, TRUE);
}
m_bDisconnectAll = false;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -