📄 iocpserver.cpp
字号:
// IOCPServer.cpp: implementation of the CIOCPServer class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "IOCPServer.h"
#include "SmppLibTest.h"
#include "common.h"
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
// Change at your Own Peril
#define HDR_SIZE sizeof(int)
#define HUERISTIC_VALUE 2
CRITICAL_SECTION CIOCPServer::m_cs;
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::CIOCPServer
//
// DESCRIPTION: C'tor initializes Winsock2 and miscelleanous events etc.
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
//
////////////////////////////////////////////////////////////////////////////////
CIOCPServer::CIOCPServer()
{
TRACE("CIOCPServer=%p\n",this);
//
WSADATA wsaData;
WSAStartup(MAKEWORD(2,2), &wsaData);
InitializeCriticalSection(&m_cs);
m_hThread = NULL;
m_hKillEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
m_socListen = NULL;
m_bTimeToKill = false;
m_bDisconnectAll = false;
m_hEvent = NULL;
m_hCompletionPort= NULL;
m_bInit = false;
m_nCurrentThreads = 0;
m_nBusyThreads = 0;
m_nWorkerCnt = 0;
//create class to handle smpp test
m_pSmppLibTest = new CSmppLibTest(this);
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::CIOCPServer
//
// DESCRIPTION: Tidy up
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
//
////////////////////////////////////////////////////////////////////////////////
CIOCPServer::~CIOCPServer()
{
Shutdown();
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: Init
//
// DESCRIPTION: Starts listener into motion
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
//
////////////////////////////////////////////////////////////////////////////////
bool CIOCPServer::Initialize(int nConnections, int nPort)
{
m_socListen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (m_socListen == INVALID_SOCKET)
{
TRACE(_T("Could not create listen socket %ld\n"),WSAGetLastError());
return false;
}
// Event for handling Network IO
m_hEvent = WSACreateEvent();
if (m_hEvent == WSA_INVALID_EVENT)
{
TRACE(_T("WSACreateEvent() error %ld\n"),WSAGetLastError());
closesocket(m_socListen);
return false;
}
// The listener is ONLY interested in FD_ACCEPT
// That is when a client connects to or IP/Port
// Request async notification
int nRet = WSAEventSelect(m_socListen,
m_hEvent,
FD_ACCEPT);
if (nRet == SOCKET_ERROR)
{
TRACE(_T("WSAAsyncSelect() error %ld\n"),WSAGetLastError());
closesocket(m_socListen);
return false;
}
SOCKADDR_IN saServer;
// Listen on our designated Port#
saServer.sin_port = htons(nPort);
// Fill in the rest of the address structure
saServer.sin_family = AF_INET;
saServer.sin_addr.s_addr = INADDR_ANY;
// bind our name to the socket
nRet = bind(m_socListen,
(LPSOCKADDR)&saServer,
sizeof(struct sockaddr));
if (nRet == SOCKET_ERROR)
{
TRACE(_T("bind() error %ld\n"),WSAGetLastError());
closesocket(m_socListen);
return false;
}
// Set the socket to listen
nRet = listen(m_socListen, nConnections);
if (nRet == SOCKET_ERROR)
{
TRACE(_T("listen() error %ld\n"),WSAGetLastError());
closesocket(m_socListen);
return false;
}
////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////
UINT dwThreadId = 0;
m_hThread =
(HANDLE)_beginthreadex(NULL, // Security
0, // Stack size - use default
ListenThreadProc, // Thread fn entry point
(void*) this,
0, // Init flag
&dwThreadId); // Thread address
if (m_hThread != INVALID_HANDLE_VALUE)
{
m_bInit = true;
InitializeIOCP();
return true;
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::ListenThreadProc
//
// DESCRIPTION: Listens for incoming clients
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
//
////////////////////////////////////////////////////////////////////////////////
unsigned CIOCPServer::ListenThreadProc(LPVOID lParam)
{
CIOCPServer* pThis = reinterpret_cast<CIOCPServer*>(lParam);
WSANETWORKEVENTS events;
while(1)
{
//
// Wait for something to happen
//
if (WaitForSingleObject(pThis->m_hKillEvent, 100) == WAIT_OBJECT_0)
break;
DWORD dwRet;
dwRet = WSAWaitForMultipleEvents(1,
&pThis->m_hEvent,
FALSE,
100,
FALSE);
if (dwRet == WSA_WAIT_TIMEOUT)
continue;
//
// Figure out what happened
//
int nRet = WSAEnumNetworkEvents(pThis->m_socListen,
pThis->m_hEvent,
&events);
if (nRet == SOCKET_ERROR)
{
TRACE(_T("WSAEnumNetworkEvents error %ld\n"),WSAGetLastError());
break;
}
// Handle Network events //
// ACCEPT
if (events.lNetworkEvents & FD_ACCEPT)
{
if (events.iErrorCode[FD_ACCEPT_BIT] == 0)
pThis->OnAccept();
else
{
TRACE(_T("Unknown network event error %ld\n"),WSAGetLastError());
break;
}
}
} // while....
return 0; // Normal Thread Exit Code...
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::OnAccept
//
// DESCRIPTION: Listens for incoming clients
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
// Ulf Hedlund 09072001 Changes for OVERLAPPEDPLUS
// Mark Tutt 09072001 setsockopt fix
////////////////////////////////////////////////////////////////////////////////
void CIOCPServer::OnAccept()
{
SOCKADDR_IN SockAddr;
SOCKET clientSocket;
int nRet;
int nLen;
if (m_bTimeToKill || m_bDisconnectAll)
return;
//
// accept the new socket descriptor
//
nLen = sizeof(SOCKADDR_IN);
clientSocket = accept(m_socListen,
(LPSOCKADDR)&SockAddr,
&nLen);
if (clientSocket == SOCKET_ERROR)
{
nRet = WSAGetLastError();
if (nRet != WSAEWOULDBLOCK)
{
//
// Just log the error and return
//
TRACE(_T("accept() error\n"),WSAGetLastError());
return;
}
}
// Create the Client context to be associted with the completion port
ClientContext* pContext = AllocateContext();
pContext->m_Socket = clientSocket;
// Fix up In Buffer
pContext->m_wsaInBuffer.buf = (char*)pContext->m_byInBuffer;
pContext->m_wsaInBuffer.len = sizeof(pContext->m_byInBuffer);
BOOL chOpt = true;
int nErr = setsockopt(pContext->m_Socket, IPPROTO_TCP, TCP_NODELAY, (char *)&chOpt, sizeof(chOpt));
if (nErr == -1)
{
TRACE(_T("setsockopt() error\n"),WSAGetLastError());
return;
}
// Associate the new socket with a completion port.
if (!AssociateSocketWithCompletionPort(clientSocket, m_hCompletionPort, (DWORD) pContext))
{
delete pContext;
pContext = NULL;
closesocket( clientSocket );
closesocket( m_socListen );
return;
}
{
CLock cs(m_cs, "OnAccept" );
// Hold a reference to the context
m_listContexts.SetAt(GetHostName(pContext->m_Socket), pContext);
}
// Trigger first IO Completion Request
// Otherwise the Worker thread will remain blocked waiting for GetQueuedCompletionStatus...
// The first message that gets queued up is ClientIoInitializing - see ThreadPoolFunc and
// IO_MESSAGE_HANDLER
OVERLAPPEDPLUS *pOverlap = new OVERLAPPEDPLUS(IOInitialize);
BOOL bSuccess = PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) pContext, &pOverlap->m_ol);
if ( (!bSuccess && GetLastError( ) != ERROR_IO_PENDING))
{
RemoveStaleClient(pContext,TRUE);
return;
}
}
ClientContext* CIOCPServer::FindClient(const CString& strClient)
{
CString strHost = strClient;
ClientContext* pContext = NULL;
m_listContexts.Lookup(strHost, pContext);
return pContext;
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::InitializeIOCP
//
// DESCRIPTION: Create a dummy socket and associate a completion port with it.
// once completion port is create we can dicard the socket
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
//
////////////////////////////////////////////////////////////////////////////////
bool CIOCPServer::InitializeIOCP(void)
{
SOCKET s;
DWORD i;
UINT nThreadID;
SYSTEM_INFO systemInfo;
//
// First open a temporary socket that we will use to create the
// completion port. In NT 3.51 it will not be necessary to specify
// the FileHandle parameter of CreateIoCompletionPort()--it will
// be legal to specify FileHandle as NULL. However, for NT 3.5
// we need an overlapped file handle.
//
s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
if ( s == INVALID_SOCKET )
return false;
// Create the completion port that will be used by all the worker
// threads.
m_hCompletionPort = CreateIoCompletionPort( (HANDLE)s, NULL, 0, 0 );
if ( m_hCompletionPort == NULL )
{
closesocket( s );
return false;
}
// Close the socket, we don't need it any longer.
closesocket( s );
// Determine how many processors are on the system.
GetSystemInfo( &systemInfo );
m_nThreadPoolMin = systemInfo.dwNumberOfProcessors * HUERISTIC_VALUE;
m_nThreadPoolMax = m_nThreadPoolMin;
m_nCPULoThreshold = 10;
m_nCPUHiThreshold = 75;
// m_cpu.Init();
// We use two worker threads for eachprocessor on the system--this is choosen as a good balance
// that ensures that there are a sufficient number of threads available to get useful work done
// but not too many that context switches consume significant overhead.
UINT nWorkerCnt = systemInfo.dwNumberOfProcessors * HUERISTIC_VALUE;
// We need to save the Handles for Later Termination...
HANDLE hWorker;
for ( i = 0; i < nWorkerCnt; i++ )
{
hWorker = (HANDLE)_beginthreadex(NULL, // Security
0, // Stack size - use default
ThreadPoolFunc, // Thread fn entry point
(void*) this, // Param for thread
0, // Init flag
&nThreadID); // Thread address
if (hWorker == NULL )
{
CloseHandle( m_hCompletionPort );
return false;
}
m_nWorkerCnt++;
CloseHandle(hWorker);
}
return true;
}
////////////////////////////////////////////////////////////////////////////////
//
// FUNCTION: CIOCPServer::ThreadPoolFunc
//
// DESCRIPTION: This is the main worker routine for the worker threads.
// Worker threads wait on a completion port for I/O to complete.
// When it completes, the worker thread processes the I/O, then either pends
// new I/O or closes the client's connection. When the service shuts
// down, other code closes the completion port which causes
// GetQueuedCompletionStatus() to wake up and the worker thread then
// exits.
//
// INPUTS:
//
// NOTES:
//
// MODIFICATIONS:
//
// Name Date Version Comments
// N T ALMOND 06042001 1.0 Origin
// Ulf Hedlund 09062001 Changes for OVERLAPPEDPLUS
////////////////////////////////////////////////////////////////////////////////
unsigned CIOCPServer::ThreadPoolFunc (LPVOID thisContext)
{
// Get back our pointer to the class
ULONG ulFlags = MSG_PARTIAL;
CIOCPServer* pThis = reinterpret_cast<CIOCPServer*>(thisContext);
ASSERT(pThis);
HANDLE hCompletionPort = pThis->m_hCompletionPort;
DWORD dwIoSize;
LPOVERLAPPED lpOverlapped;
ClientContext* lpClientContext;
OVERLAPPEDPLUS* pOverlapPlus;
bool bError;
bool bEnterRead;
InterlockedIncrement(&pThis->m_nCurrentThreads);
InterlockedIncrement(&pThis->m_nBusyThreads);
//
// Loop round and round servicing I/O completions.
//
for ( BOOL bStayInPool = TRUE; bStayInPool && pThis->m_bTimeToKill == false; )
{
pOverlapPlus = NULL;
lpClientContext = NULL;
bError = false;
bEnterRead = false;
// Thread is Block waiting for IO completion
InterlockedDecrement(&pThis->m_nBusyThreads);
// Get a completed IO request.
BOOL bIORet = GetQueuedCompletionStatus(
hCompletionPort,
&dwIoSize,
(LPDWORD) &lpClientContext,
&lpOverlapped, INFINITE);
DWORD dwIOError = GetLastError();
pOverlapPlus = CONTAINING_RECORD(lpOverlapped, OVERLAPPEDPLUS, m_ol);
int nBusyThreads = InterlockedIncrement(&pThis->m_nBusyThreads);
if (!bIORet && dwIOError != WAIT_TIMEOUT )
{
if (lpClientContext && pThis->m_bTimeToKill == false)
pThis->RemoveStaleClient(lpClientContext, FALSE);
continue;
// anyway, this was an error and we should exit
bError = true;
}
if (!bError)
{
// Allocate another thread to the thread Pool?
if (nBusyThreads == pThis->m_nCurrentThreads)
{
if (nBusyThreads < pThis->m_nThreadPoolMax)
{
// if (pThis->m_cpu.GetUsage() > pThis->m_nCPUHiThreshold)
{
// UINT nThreadID = -1;
// HANDLE hThread = (HANDLE)_beginthreadex(NULL, // Security
// 0, // Stack size - use default
// ThreadPoolFunc, // Thread fn entry point
/// (void*) pThis,
// 0, // Init flag
// &nThreadID); // Thread address
// CloseHandle(hThread);
}
}
}
// Thread timed out - IDLE?
if (!bIORet && dwIOError == WAIT_TIMEOUT)
{
if (lpClientContext == NULL)
{
// if (pThis->m_cpu.GetUsage() < pThis->m_nCPULoThreshold)
// {
// // Thread has no outstanding IO - Server hasn't much to do so die
// if (pThis->m_nCurrentThreads > pThis->m_nThreadPoolMin)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -