📄 iocpbase.cpp
字号:
}
loop_pass:;
if ( pSDS )
delete pSDS;
pSDS = NULL;
if ( pBSocket && pBSocket->GetIOPendingEnableFlag() == TRUE )
{
if ( !pLastBSocket )
{
nContinueSendinfCount++;
pLastBSocket = pBSocket;
}
else
{
if ( pLastBSocket == pBSocket )
nContinueSendinfCount++;
else
{
nContinueSendinfCount = 0;
pLastBSocket = pBSocket;
}
}
if ( nContinueSendinfCount > MAX_CONTINUE_SENDING_COUNT )
{
nContinueSendinfCount = 0;
pLastBSocket = NULL;
LeaveCriticalSection( pSendCriticalSection );
Sleep(1);
continue;
}
else
{
if ( pIocpBase->m_nDataListLength[nThreadNo] > 0 )
{
if ( GetTickCount() - process_tick > MAX_SEND_CONTINUE_TICK )
{
nContinueSendinfCount = 0;
LeaveCriticalSection( pSendCriticalSection );
Sleep(1);
continue;
}
goto data_loop;
}
nContinueSendinfCount = 0;
pLastBSocket = NULL;
}
}
else
{
if ( ++nContinueSendinfCount > 50 )
{
nContinueSendinfCount = 0;
LeaveCriticalSection( pSendCriticalSection );
Sleep(1);
continue;
}
else
{
if ( pIocpBase->m_nDataListLength[nThreadNo] > 0 )
{
if ( GetTickCount() - process_tick > MAX_SEND_CONTINUE_TICK )
{
nContinueSendinfCount = 0;
LeaveCriticalSection( pSendCriticalSection );
Sleep(1);
continue;
}
goto data_loop;
}
nContinueSendinfCount = 0;
}
}
LeaveCriticalSection( pSendCriticalSection );
}
return 1;
}
*/
UINT WorkerDataProcThread( void *lp )
{
DATAPROCTHREADPACKET *pDPTP = (DATAPROCTHREADPACKET *)lp;
CIOCPBASE *pIocpBase = pDPTP->pIocpbase;
CPoolBaseManager *pSQM = pIocpBase->m_pPBM;
int nThreadNo = 0;
nThreadNo = pIocpBase->m_CurDataProcThreadNo;
pIocpBase->m_CurDataProcThreadNo++;
CBSocket *pSocket;
start_loop:;
if ( pIocpBase->m_CurDataProcThreadNo < pIocpBase->m_ThreadCount )
{
Sleep(1);
goto start_loop;
}
TRACE("Thread Started [%d][%d]...\n", nThreadNo, pIocpBase->m_CurDataProcThreadNo-1 );
WAIT_RECV_DATA *wrd;
// 胶饭靛 皋牢 风凭...
for (;;)
{
// IKING 2002.7.3
if ( pIocpBase->m_nHeadPtr[nThreadNo] == pIocpBase->m_nTailPtr[nThreadNo] )
{
Sleep(1);
continue;
}
wrd = pIocpBase->m_pRecvData[nThreadNo][pIocpBase->m_nTailPtr[nThreadNo]];
pIocpBase->m_pRecvData[nThreadNo][pIocpBase->m_nTailPtr[nThreadNo]] = NULL;
pIocpBase->m_nTailPtr[nThreadNo]++;
pIocpBase->m_nTailPtr[nThreadNo] %= WAIT_RECV_DATA_BUFFER;
if ( wrd == NULL ) continue;
//g_ThreadAliveTime[nThreadNo] = GetTickCount();
if ( wrd->m_Type == SOCKET_FOR_DISCONNECT )
{
if ( wrd->usn >= 0 && wrd->usn < 32767 )
{
CPoolBaseManager *pSQM = pIocpBase->m_pPBM;
if(!pSQM)
{
delete wrd;
continue;
}
else
{
if ( pSQM->m_pResources->IsFree( wrd->usn ) == false )
{
pSocket = (CBSocket *)pSQM->m_pResources->GetDataValue( wrd->usn );
if ( !pSocket )
{
delete wrd;
continue;
}
}
else
{
delete wrd;
continue;
}
}
}
else
{
delete wrd;
continue;
}
if ( pSocket == NULL )
{
delete wrd;
continue;
}
pSocket->SockCloseProcess();
delete wrd;
continue;
}
// 荤侩磊 沥狼 单捞鸥 贸府 窃荐 龋免...
pDPTP->fn(wrd);
delete wrd;
}
return 1;
}
CIOCPBASE::CIOCPBASE()
{
int i,j;
m_ThreadCount = 1;
m_CurThreadNo = 0;
m_pPBM = NULL;
m_pIopendingData = NULL;
m_nIopendingDataCount = 0;
m_bIOPendingStop = FALSE;
for ( i = 0; i < MAX_WORKER_THREAD; i++ )
{
InitializeCriticalSection(&m_CS_ReceiveData[i]);
}
m_CurRecvThreadNo = 0;
m_CurDataProcThreadNo = 0;
// IKING 2002.7.3
for ( i = 0; i < MAX_WORKER_THREAD+1; i++ )
{
m_nHeadPtr[i] = 0;
m_nTailPtr[i] = 0;
};
for ( i = 0; i < MAX_WORKER_THREAD+1; i++ )
{
for ( j = 0; j < WAIT_RECV_DATA_BUFFER+1; j++ )
{
m_pRecvData[i][j] = NULL;
}
};
}
CIOCPBASE::~CIOCPBASE()
{
// Listen Socket Array 瘤快扁...
for ( int i = 0; i < m_ListenSocketArray.GetSize(); i++ )
{
if ( m_ListenSocketArray[i] )
{
delete m_ListenSocketArray[i];
m_ListenSocketArray[i] = NULL;
}
}
m_ListenSocketArray.RemoveAll();
// Event Array 瘤快扁...
for ( i = 0; i < m_hListenEventArray.GetSize(); i++ )
{
if ( m_hListenEventArray[i] )
{
delete m_hListenEventArray[i];
m_hListenEventArray[i] = NULL;
}
}
m_hListenEventArray.RemoveAll();
// Thread Packet Array 瘤快扁...
for ( i = 0; i < m_ThreadPacketArray.GetSize(); i++ )
{
if ( m_ThreadPacketArray[i] )
{
delete m_ThreadPacketArray[i];
m_ThreadPacketArray[i] = NULL;
}
}
m_ThreadPacketArray.RemoveAll();
//
for ( i = 0; i < MAX_WORKER_THREAD; i++ )
{
DeleteCriticalSection(&m_CS_ReceiveData[i]);
}
if ( m_pIopendingData )
{
delete[] m_pIopendingData;
m_pIopendingData = NULL;
}
CloseHandle( m_CreateSignalEvent );
}
void CIOCPBASE::CreateAcceptThread()
{
//DWORD id;
//unsigned int id;
//HANDLE acceptThread;
m_pThreadPacket = new THREADPACKET;
m_pThreadPacket->pIocpbase = this;
m_pThreadPacket->phListenSockEvent = m_phListenSocketEvent;
m_pThreadPacket->pListenSock = m_pSocketListen;
m_pThreadPacket->iSocketType = m_Type;
m_ThreadPacketArray.Add(m_pThreadPacket);
//acceptThread = ::CreateThread( NULL, 0, AcceptListenThread, (LPVOID)m_pThreadPacket, 0, &id);
//acceptThread = (HANDLE)_beginthreadex( NULL, 0, &AcceptListenThread, (LPVOID)m_pThreadPacket, 0, &id);
m_acceptThread = AfxBeginThread( AcceptListenThread, (LPVOID)m_pThreadPacket );
//::SetThreadPriority(acceptThread,THREAD_PRIORITY_ABOVE_NORMAL); //滚弊 蜡惯...
//::SetThreadPriority(acceptThread,THREAD_PRIORITY_NORMAL);
}
void CIOCPBASE::CreateWorkerThread()
{
SYSTEM_INFO SystemInfo;
HANDLE hWorkerThread[MAXWORKERTHREAD];
//DWORD WorkerId[MAXWORKERTHREAD];
unsigned int WorkerId[MAXWORKERTHREAD];
//
// try to get timing more accurate... Avoid context
// switch that could occur when threads are released
//
//SetPriorityClass(GetCurrentProcess(), ABOVE_NORMAL_PRIORITY_CLASS);
//SetPriorityClass(GetCurrentProcess(), HIGH_PRIORITY_CLASS);
//SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_TIME_CRITICAL);
SetThreadPriority( GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
//
// Figure out how many processors we have to size the minimum
// number of worker threads and concurrency
//
GetSystemInfo (&SystemInfo);
//-------------------------------------------------------------------------
// 荐脚 胶饭靛 父甸扁...
m_dwNumberOfWorkers = 2 * SystemInfo.dwNumberOfProcessors + 2;
m_dwConcurrency = 0;
//for (int i = 0; i < m_ThreadCount; i++ )
//{
// m_hIOCPort[i] = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, (DWORD)-1, m_dwConcurrency );
//}
m_hIOCPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE,
NULL,
(DWORD)-1,
m_dwConcurrency );
// 胶饭靛 积己窍扁...
for ( DWORD i = 0; i < m_dwNumberOfWorkers; i++ )
{
/*
hWorkerThread[i] = ::CreateThread(
NULL,
0,
WorkerClientSocketThread,
(LPVOID)this,
CREATE_SUSPENDED,
&WorkerId[i]
);
*/
hWorkerThread[i] = (HANDLE)_beginthreadex(
NULL,
0,
WorkerClientSocketThread,
(LPVOID)this,
CREATE_SUSPENDED,
&WorkerId[i]
);
//m_hWorkerThread[i] = AfxBeginThread( WorkerSocketThreadWrapper, (LPVOID)this );
//SetThreadPriority( hWorkerThread[i], THREAD_PRIORITY_TIME_CRITICAL);
//SetThreadPriority( hWorkerThread[i], THREAD_PRIORITY_HIGHEST);
SetThreadPriority( hWorkerThread[i], THREAD_PRIORITY_ABOVE_NORMAL);
}
// 胶饭靛 劝己拳...
for ( DWORD j = 0; j < m_dwNumberOfWorkers; j++ )
{
ResumeThread( hWorkerThread[j] );
}
}
BOOL CIOCPBASE::Associate(CIOCPSocket *pIocpSock)
{
HANDLE hTemp;
if (!m_hIOCPort)
{
//TRACE("]Error : There is no Completion Port[%d]\n", iModSid );
return FALSE;
}
hTemp = CreateIoCompletionPort( (HANDLE)pIocpSock->m_Socket, m_hIOCPort, (DWORD)pIocpSock->m_Sid, m_dwConcurrency);
if ( hTemp == NULL )
{
#ifdef _DEBUG
LPVOID lpMsgBuf;
FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
GetLastError(),
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
(LPTSTR) &lpMsgBuf,
0,
NULL
);
TRACE("CreateIoCompletionPort Error : %s\n", lpMsgBuf );
LocalFree( lpMsgBuf );
#endif
return FALSE;
}
return (hTemp == m_hIOCPort);
}
BOOL CIOCPBASE::Listen(int port, char *pStrSvrAddr )
{
int opt;
struct sockaddr_in addr;
//struct linger lingerOpt;
// Open a TCP socket (an Internet stream socket).
//
m_pSocketListen = new SOCKET;
if ( m_pSocketListen == NULL ) return FALSE;
*m_pSocketListen = WSASocket(AF_INET,
SOCK_STREAM,
0,
NULL,
0,
WSA_FLAG_OVERLAPPED);
if ( *m_pSocketListen < 0 )
{
TRACE("]Error(01) - Can't Open Stream Socket\n");
return FALSE;
}
// Bind our local address so that the client can send to us.
//
memset((void *)&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
if ( pStrSvrAddr == NULL )
addr.sin_addr.s_addr = htonl(INADDR_ANY);
else
addr.sin_addr.s_addr = inet_addr( pStrSvrAddr );
addr.sin_port = htons(port);
// added in an attempt to allow rebinding to the port
//
// KEEP_ALIVE甫 enable 矫挪促...
opt = 1;
setsockopt( *m_pSocketListen, SOL_SOCKET, SO_KEEPALIVE, (char *)&opt, sizeof(opt));
// added in an attempt to allow rebinding to the port
//
opt = 1;
setsockopt( *m_pSocketListen, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt));
// Linger off -> close socket immediately regardless of existance of data
//
//lingerOpt.l_onoff = 0;
//lingerOpt.l_linger = 0;
//setsockopt( *m_pSocketListen, SOL_SOCKET, SO_LINGER, (char *)&lingerOpt, sizeof(lingerOpt) );
// IKING 2002.1.
/* int ret, optlen;
BOOL bNagleingFlag = 1;
optlen = sizeof(bNagleingFlag);
ret = getsockopt( *m_pSocketListen, IPPROTO_TCP, TCP_NODELAY, (char *)&bNagleingFlag, &optlen );
if ( ret == SOCKET_ERROR )
{
TRACE("GETTING TCP_NODELAY ERROR(%d)...\n", WSAGetLastError() );
}
if ( bNagleingFlag == FALSE )
{
bNagleingFlag = TRUE;
ret = setsockopt( *m_pSocketListen, IPPROTO_TCP, TCP_NODELAY, (char *)&bNagleingFlag, sizeof(bNagleingFlag) );
if ( ret == SOCKET_ERROR )
{
TRACE("SETTING TCP_NODELAY ERROR(%d)...\n", WSAGetLastError() );
}
}
*/ //
if ( bind( *m_pSocketListen, (struct sockaddr *)&addr, sizeof(addr)) < 0 )
{
TRACE("]Error(02)- Can't bind local address\n");
return FALSE;
}
int socklen, len, err;
// socklen = DEFAULT_RCVBUF * 5;
socklen = 8192 * 8;
setsockopt( *m_pSocketListen, SOL_SOCKET, SO_RCVBUF, (char*)&socklen, sizeof(socklen) );
len = sizeof(socklen);
err = getsockopt( *m_pSocketListen, SOL_SOCKET, SO_RCVBUF, (char*)&socklen, &len );
if (err == SOCKET_ERROR)
{
TRACE("]Set Socket RecvBuf of port(%d) as %d : Fail\n", port, socklen);
return FALSE;
}
// socklen = DEFAULT_RCVBUF*5;
socklen = 8192 * 8;//8192 * 4;
setsockopt( *m_pSocketListen, SOL_SOCKET, SO_SNDBUF, (char*)&socklen, sizeof(socklen) );
len = sizeof(socklen);
err = getsockopt( *m_pSocketListen, SOL_SOCKET, SO_SNDBUF, (char*)&socklen, &len );
if (err == SOCKET_ERROR)
{
TRACE("]Set Socket SendBuf of port(%d) as %d : Fail\n", port, socklen);
return FALSE;
}
listen( *m_pSocketListen, 5 );
m_ListenSocketArray.Add( m_pSocketListen );
m_phListenSocketEvent = new HANDLE;
*m_phListenSocketEvent = WSACreateEvent();
m_hListenEventArray.Add(m_phListenSocketEvent);
WSAEventSelect( *m_pSocketListen, *m_phListenSocketEvent, FD_ACCEPT);
TRACE("]Port[%05d] initialzed\n", port);
CreateAcceptThread();
return TRUE;
}
int CIOCPBASE::Init( int type, CPoolBaseManager *pPBM, DWORD (*fn)(LPVOID lp) )
{
m_Type = type;
m_pPBM = pPBM;
m_bAcceptEnableFlag = FALSE;
// 函荐 檬扁拳...
for ( int i = 0; i < m_ThreadCount; i++ )
{
m_nDataListLength[i] = 0;
}
m_CreateSignalEvent = CreateEvent( NULL, TRUE, FALSE, NULL );
CreateWorkerThread();
// 单捞鸥 贸府 弊饭靛 父甸扁...
if ( fn )
{
//HANDLE hWorkerDataProcThread[MAXWORKERTHREAD];
//unsigned int WorkerDataProcId[MAXWORKERTHREAD];
m_Dptp.pIocpbase = this;
m_Dptp.fn = fn;
for ( i = 0; i < m_ThreadCount; i++ )
{
/*
hWorkerDataProcThread[i] = (HANDLE)_beginthreadex(
NULL,
0,
WorkerDataProcThread,
(LPVOID)&m_Dptp,
CREATE_SUSPENDED,
&WorkerDataProcId[i]
);
*/
m_hWorkerThread[i] = AfxBeginThread( WorkerDataProcThread, (LPVOID)&m_Dptp, THREAD_PRIORITY_ABOVE_NORMAL );
}
// 单捞鸥 胶饭靛 劝己拳...
//for ( i = 0; i < m_ThreadCount; i++ )
//{
// ResumeThread( hWorkerDataProcThread[i] );
//}
//}
}
return 1;
}
void CIOCPBASE::SetAliveTimeUpdate(int uid, DWORD ctick )
{
if ( m_pPBM )
m_pPBM->m_pResources->SetTimeStamp( uid, ctick );
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -