📄 iocpbase.cpp
字号:
///////////////////////////////////////////////////////////////////////////////
// I/O COMPLETION PORT BASE CLASS
//
#include "StdAfx.h"
#include "IOCPSocket.h"
#include "Iocpbase.h"
#include "Cbsocket.h"
#include "Poolbase.h"
#include "Scdefine.h"
#include "Resource.h"
#include "Serverdlg.h"
#include <process.h>
#include "Extern.h"
#define MAX_CONTINUE_SENDING_COUNT 5
//#define MAX_SEND_CONTINUE_TICK 25 // 0.05 sec.
#define MAX_SEND_CONTINUE_TICK 300 // 0.05 sec.
extern CServerDlg *g_pMainDlg;
extern DWORD g_ThreadAliveTime[];
//DWORD WINAPI AcceptListenThread(LPVOID lp)
//unsigned __stdcall AcceptListenThread( void *lp )
UINT AcceptListenThread( void *lp )
{
struct linger lingerOpt;
int l_stat;
THREADPACKET *pThreadPacket = (THREADPACKET *)lp;
CIOCPBASE *pIocpBase = pThreadPacket->pIocpbase;
CPoolBaseManager *pSQM = pIocpBase->m_pPBM;
HANDLE *hListenSockEvent;
SOCKET *pListenSocket;
int nSocketType;
const RHANDLE* pHandle = 0;
hListenSockEvent = pThreadPacket->phListenSockEvent;
pListenSocket = pThreadPacket->pListenSock;
nSocketType = pThreadPacket->iSocketType;
struct sockaddr_in addr;
int wait_return, sid, addr_len;
WSANETWORKEVENTS network_event;
while( !pIocpBase->m_bAcceptEnableFlag )
{
Sleep(1);
};
while ( 1 )
{
wait_return = WaitForSingleObject( *hListenSockEvent, INFINITE);
if ( wait_return == WAIT_FAILED )
{
TRACE("]Wait Listen Socket Failed[%d]...", GetLastError());
return 1;
}
WSAEnumNetworkEvents( *pListenSocket, *hListenSockEvent, &network_event );
if ( network_event.lNetworkEvents & FD_ACCEPT )
{
if ( network_event.iErrorCode[FD_ACCEPT_BIT] == 0 )
{
CBSocket *pSocket;
int getsocket_count = 0;
getsocket_loop:
pHandle = 0; sid = -1;
DWORD ret = pSQM->GetFreeResource(&pHandle);
if ( ret == 0 )
{
if ( pHandle == NULL )
{
Sleep(1);
TRACE("]GetFreeResource but NULL pointer(pHandle) and continue...\n");
continue;
}
sid = pHandle->pos;
pSocket = (CBSocket *)pHandle->handle;
if ( pSocket == NULL )
{
Sleep(1);
TRACE("]GetFreeResource but NULL pointer(pHandle->handle) and continue...\n");
continue;
}
pSocket->m_pIocpBase = NULL;
}
else
{
TRACE("]Accepting success but Fail to allocates socket.\n");
SOCKET s;
s = accept( *pListenSocket, (struct sockaddr *)&addr, &addr_len );
if ( s != INVALID_SOCKET )
closesocket(s);
getsocket_count++;
if ( getsocket_count < 3 )
goto getsocket_loop;
Sleep(1000);
continue;
}
addr_len = sizeof(addr);
pSocket->m_Socket = accept( *pListenSocket, (struct sockaddr *)&addr, &addr_len );
if ( pSocket->m_Socket == INVALID_SOCKET )
{
TRACE("]Accept Fail : Invalid socket.\n");
pSQM->ReleaseResource(pHandle);
//Sleep(50);
continue;
}
// Linger off -> close socket immediately regardless of existance of data
//
lingerOpt.l_onoff = 1;
lingerOpt.l_linger = 0;
l_stat = setsockopt( pSocket->m_Socket, SOL_SOCKET, SO_LINGER, (char *)&lingerOpt, sizeof(lingerOpt) );
if ( l_stat < 0 )
{
TRACE("]Linger opt Failed : sid[%d]\n", sid);
pSocket->B_OnClose(0);
pSQM->ReleaseResource(pHandle);
continue;
}
if( g_bShutDown )
{
pSocket->B_OnClose(0);
pSQM->ReleaseResource(pHandle);
continue;
}
int retValue;
/* int retValue, optlen;
BOOL bNagleingFlag;
optlen = sizeof(bNagleingFlag);
retValue = getsockopt( pSocket->m_Socket, IPPROTO_TCP, TCP_NODELAY, (char *)&bNagleingFlag, &optlen );
if ( retValue == SOCKET_ERROR )
{
TRACE("GETTING TCP_NODELAY ERROR(%d)...\n", WSAGetLastError() );
}
if ( bNagleingFlag == FALSE )
{
bNagleingFlag = TRUE;
ret = setsockopt( pSocket->m_Socket, IPPROTO_TCP, TCP_NODELAY, (char *)&bNagleingFlag, sizeof(bNagleingFlag) );
if ( ret == SOCKET_ERROR )
{
TRACE("SETTING TCP_NODELAY ERROR(%d)...\n", WSAGetLastError() );
}
}
*/
pSocket->m_pIocpBase = pIocpBase;
pSocket->m_Sid = sid;
pSocket->m_State = STATE_ACCEPTED;
pSocket->m_Type = nSocketType;
// RECV OV : USER I/O COMPLETION PACKET FOR NOTIFY...
pSocket->m_RecvOverlap.hEvent = NULL;
// SEND OV : USER STANDARD EVENT MECH.
//pSocket->m_SendOverlap.hEvent = NULL;
if ( pSocket->m_SendOverlap.hEvent == NULL )
pSocket->m_SendOverlap.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
pSocket->m_SendOverlap.hEvent = (HANDLE)((DWORD)pSocket->m_SendOverlap.hEvent|0x1);
memcpy( &pSocket->m_Addr, &addr, sizeof(addr) );
// IKING 2002.6.29
if ( pSocket->m_hSockEvent == NULL )
pSocket->m_hSockEvent = WSACreateEvent();
if ( !pIocpBase->Associate( pSocket ) )
{
TRACE("]Associates Failed : sid[%d]\n", sid);
pSocket->B_OnClose(0);
pSQM->ReleaseResource(pHandle);
//Sleep(50);
continue;
}
// Socket Enable...
pSocket->m_SockFlag = 1;
pSocket->Init(0);
retValue = pSocket->RecycleRead();
if ( retValue == -1 )
{
g_pMainDlg->UserFree( pSocket->m_Sid );
TRACE("]Accepting and recycleread error : sid[%d]\n", sid);
pSocket->B_OnClose(0);
pSQM->ReleaseResource(pHandle);
continue;
}
else if ( retValue == -2 )
{
g_pMainDlg->UserFree( pSocket->m_Sid );
pSocket->B_OnClose(0);
pSQM->ReleaseResource(pHandle);
continue;
}
TRACE("]Accept Socket[%d]...\n", sid);
//Sleep(50);
// accept 等 家南捞 脚龋甫 罐阑 锭鳖瘤 扁促妨 航...
//long acceptEnableTick = GetTickCount();
//while ( pSocket->m_ActivatedFlag == FALSE && (GetTickCount() - acceptEnableTick) < 2000 )
// {
Sleep(100);
// }
}
}
}
return 1;
}
//DWORD WINAPI WorkerClientSocketThread(LPVOID lp)
unsigned __stdcall WorkerClientSocketThread( void *lp )
//UINT WorkerClientSocketThread( void *lp )
{
CIOCPBASE *pIocpBase = (CIOCPBASE *)lp;
CPoolBaseManager *pSQM = pIocpBase->m_pPBM;
int nModCount;
nModCount = pIocpBase->m_CurRecvThreadNo++;
DWORD WorkIndex;
BOOL b;
LPOVERLAPPED pOvl;
DWORD nbytes;
CBSocket *pBSocket;
int retValue;
if ( pIocpBase->GetSocketType() == SOCKET_FOR_SERVER)
TRACE(">Server Worker Thread Started :[%d]...\n", nModCount);
else
TRACE(">User Worker Thread Started :[%d]...\n", nModCount);
while ( 1 )
{
b = GetQueuedCompletionStatus(
pIocpBase->m_hIOCPort,
&nbytes,
&WorkIndex,
&pOvl,
INFINITE
);
if ( b && pOvl )
{
if ( WorkIndex >= pSQM->m_dwNumCurrentResources )
{
TRACE("U1: WorkIndex >= pSQM->m_dwNumCurrentResources...\n");
goto loop_pass;
}
if ( pOvl->Offset != OVL_CLOSE )
{
if ( pSQM->m_pResources->IsFree( WorkIndex ) == true )
{
pBSocket = (CBSocket *)pSQM->m_pResources->GetDataValue( WorkIndex );
if ( !pBSocket )
{
TRACE("U1_1: !pBSocket...\n");
goto loop_pass;
}
// pBSocket->SessionLogOut();
pBSocket->m_ActivatedFlag = 1;
pBSocket->SocketDisConnect();
TRACE("U2: pSQM->m_pResources->IsFree( WorkIndex ) == true[%d]... \n", WorkIndex );
goto loop_pass;
}
}
pBSocket = (CBSocket *)pSQM->m_pResources->GetDataValue( WorkIndex );
if ( !pBSocket )
{
TRACE("U3: !pBSocket...\n");
goto loop_pass;
}
//g_ThreadAliveTime[nModCount] = GetTickCount();
switch ( pOvl->Offset )
{
case OVL_RECEIVE:
if ( !nbytes )
{
// pBSocket->SessionLogOut();
pBSocket->m_ActivatedFlag = 1;
pBSocket->SocketDisConnect();
break;
}
/*
// WSA_IO_PENDING 眉农...
{
int ret;
DWORD transfer;
DWORD dwFlag;
DWORD tick_count = GetTickCount();
do
{
ret = WSAGetOverlappedResult(pBSocket->m_Socket,
pOvl,
&transfer,
FALSE,
&dwFlag);
} while( ret == FALSE && (GetTickCount() - tick_count) < 10 );
if ( ret == FALSE )
{
TRACE("*** RECV IO PENDING ERROR=%d ***\n", WorkIndex);
break;
}
}
*/
//
pBSocket->m_ActivatedFlag = 1;
pBSocket->m_iWsaReadIOPendFlag = 0;
pBSocket->ParseCommand(nbytes);
retValue = pBSocket->RecycleRead();
if ( retValue == -1 )
{
// pBSocket->SessionLogOut();
pBSocket->SocketDisConnect();
TRACE("RECV-RECYCLEING-ERROR(%d) [%d]...\n", retValue, WorkIndex);
break;
}
else if ( retValue == -2 )
{
// pBSocket->SessionLogOut();
pBSocket->SocketDisConnect();
TRACE("RECV-RECYCLEING-ERROR2(%d) [%d]...\n", retValue, WorkIndex);
break;
}
break;
case OVL_SEND:
pBSocket->m_ActivatedFlag = 1;
pBSocket->B_OnSend(0);
break;
case OVL_CLOSE:
if ( pSQM->m_pResources->IsFree( WorkIndex ) == true )
break;
pBSocket->m_ActivatedFlag = 1;
if ( pBSocket->m_Socket != INVALID_SOCKET )
{
closesocket(pBSocket->m_Socket);
pBSocket->m_Socket = INVALID_SOCKET;
}
/* if ( pBSocket->m_hSockEvent )
{
WSACloseEvent( pBSocket->m_hSockEvent );
pBSocket->m_hSockEvent = NULL;
}
if ( pBSocket->m_RecvOverlap.hEvent )
{
CloseHandle(pBSocket->m_RecvOverlap.hEvent);
pBSocket->m_RecvOverlap.hEvent = NULL;
}
if ( pBSocket->m_SendOverlap.hEvent )
{
CloseHandle(pBSocket->m_SendOverlap.hEvent);
pBSocket->m_SendOverlap.hEvent = NULL;
}
*/ RHANDLE *pHandle;
pHandle = pSQM->m_pResources->GetHandle( WorkIndex );
if ( pHandle )
pSQM->ReleaseResource(pHandle);
else
{
TRACE("]Iocp closed : sid[%d] Handle Error...\n", WorkIndex );
}
TRACE("]IocpClosed : sid[%d]\n", WorkIndex );
break;
default:
TRACE("]Iocp closed for Default: sid[%d]\n", WorkIndex );
// pBSocket->SessionLogOut();
pBSocket->SocketDisConnect();
break;
}
}
else
{
if ( pOvl )
{
if ( WorkIndex >= pSQM->m_dwNumCurrentResources )
{
TRACE("D1: WorkIndex >= pSQM->m_dwNumCurrentResources...[%d]\n", WorkIndex );
goto loop_pass;
}
pBSocket = (CBSocket *)pSQM->m_pResources->GetDataValue( WorkIndex );
if ( !pBSocket )
{
TRACE("D3: !pBSocket...\n");
goto loop_pass;
}
// pBSocket->SessionLogOut();
pBSocket->m_ActivatedFlag = 1;
pBSocket->SocketDisConnect();
TRACE("]b is NULL : sid[%d]\n", WorkIndex);
}
else // ERROR...
{
if ( WorkIndex < pSQM->m_dwNumCurrentResources )
{
pBSocket = (CBSocket *)pSQM->m_pResources->GetDataValue( WorkIndex );
if ( pBSocket )
{
// pBSocket->SessionLogOut();
pBSocket->m_ActivatedFlag = 1;
pBSocket->SocketDisConnect();
}
}
int ioError;
ioError = GetLastError();
if ( ioError == WAIT_TIMEOUT )
{
TRACE ("GETQUEUEDCOMPLETIONSTATUS : TIME-OUT...");
}
else
{
TRACE1("GETQUEUEDCOMPLETIONSTATUS : ERROR CODE=%d", ioError);
}
}
}
loop_pass:;
}
return 1;
}
UINT WorkerSocketThreadWrapper( void *lp )
{
UINT dwExitCode = 0;
__try
{
dwExitCode = WorkerClientSocketThread( lp );
}
__except ((GetExceptionCode() == EXCEPTION_ACCESS_VIOLATION) ? EXCEPTION_EXECUTE_HANDLER : EXCEPTION_CONTINUE_SEARCH)
{}
return (dwExitCode);
}
/*
DWORD WINAPI WorkerClientSendSocketThread(LPVOID lp)
{
int nContinueSendinfCount = 0;
int nThreadNo;
CIOCPBASE *pIocpBase = (CIOCPBASE *)lp;
CPoolBaseManager *pSQM = pIocpBase->m_pPBM;
HANDLE _CreateSignalEvent = pIocpBase->m_CreateSignalEvent;
nThreadNo = pIocpBase->m_CurThreadNo++;
CRITICAL_SECTION *pSendCriticalSection = &pIocpBase->m_SendDataCriticalSection[nThreadNo];
DWORD WorkIndex;
CBSocket *pBSocket, *pLastBSocket;
SEND_DATA_STRU *pSDS;
int nModCount;
long process_tick;
pLastBSocket = NULL;
nModCount = pIocpBase->m_ThreadCount;
while ( 1 )
{
if ( pIocpBase->m_nDataListLength[nThreadNo] < 1 )
{
Sleep(1);
continue;
}
// 价脚 单捞鸥 府胶飘俊辑 单捞鸥 啊历坷扁...
EnterCriticalSection( pSendCriticalSection );
if ( pIocpBase->m_nDataListLength[nThreadNo] < 1 )
{
LeaveCriticalSection( pSendCriticalSection );
continue;
}
process_tick = GetTickCount();
data_loop:;
pSDS = (SEND_DATA_STRU *)pIocpBase->m_pSendDataList[nThreadNo].RemoveHead();
pIocpBase->m_nDataListLength[nThreadNo] = pIocpBase->m_pSendDataList[nThreadNo].GetCount();
data_loop1:;
if ( pSDS == NULL )
goto loop_pass;
WorkIndex = pSDS->m_sid;
pBSocket = NULL;
if ( WorkIndex >= pSQM->m_dwNumCurrentResources || WorkIndex < 0 )
{
goto loop_pass;
}
if ( pSQM->m_pResources->IsFree( WorkIndex ) == true )
{
goto loop_pass;
}
pBSocket = (CBSocket *)pSQM->m_pResources->GetDataValue( WorkIndex );
if ( pBSocket == NULL ) goto loop_pass;
int retValue;
retValue = pBSocket->DxSend( pSDS->m_nDataLength, (char *)pSDS->m_pData );
if ( retValue == -1 && pBSocket->m_SockFlag == 0 )
{
//int rValue;
//rValue = pBSocket->SockCloseProcess();
//if ( rValue == 1 ) bSendErrorFlag = 1;
TRACE("*** check send socket[%d] close ***\n", WorkIndex );
pBSocket->SendSockCloseProcess();
// 辑滚 家南栏肺 悼累窍绰 版快俊绰
// 促矫 立加且锭鳖瘤 单捞鸥甫 贸府窍瘤 臼绰促...
if ( pBSocket->GetIOPendingEnableFlag() == TRUE )
{
if ( pSDS )
delete pSDS;
pSDS = NULL;
LeaveCriticalSection( pSendCriticalSection );
continue;
}
}
else if ( retValue == FALSE ) // 泅犁 焊辰 单捞鸥啊 价脚 滚欺俊 歹秦瘤瘤 臼疽澜...
{
// 滚欺 钱 惑怕...
Sleep(1);
goto data_loop1; // 单捞鸥 犁价脚...
}
else if ( retValue == -3 ) // 泅犁 焊辰 单捞鸥啊 价脚 滚欺俊 歹秦脸澜..
{
// WSA_IO_PENDING 惑怕牢 版快...
// 矫埃父 距埃 掉饭捞...
if ( pSDS )
delete pSDS;
pSDS = NULL;
LeaveCriticalSection( pSendCriticalSection );
Sleep(1);
continue;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -