📄 udiiocpsocketserver.pas
字号:
unit uDIIocpSocketServer;
interface
uses
Windows, Sysutils, WinSock2, uDIBuffer, uDIMapBuffer, uDIClientChannel, uDISocketServer,
uDIProtocol, uDIThread;
type
TDIIocpSocketServer = class;
TDIListenThread = class;
//线程结构
PThreadWorkParam = ^ThreadWorkParam;
ThreadWorkParam = packed record
nThread: DWORD; //索引
FSocketServer: TDIIocpSocketServer; //指向IOCPServer的指针
FDIThread: TDIThread; //线程
hEvent: Thandle; //事件
end;
TOnSocketEvent = procedure(FClientChannel: TDIClientChannel) of object;
TOnSocketIOEvent = procedure(FClientChannel: TDIClientChannel; FDIBuffer: TDIBuffer) of object;
TDIIocpSocketServer = class(TDISocketServer)
protected
m_iIOCPPostAccepts: Integer; //下一次需要投递的AcceptEx数量
m_nInitPostAcceptEx: DWORD; //初次投递AcceptEx 数量
m_DIMapAcceptExBufferList: TDIMapBuffer; //AcceptEx内存池列表
m_WrokThreads: array of ThreadWorkParam; //工作者线程数组
m_hPostAcceptEvent: Thandle; //投递Accept I/O事件内核对象(连接新用户投递维持AcceptEx数量)
m_hListenThreadEvent: Thandle; //侦听线程事件WSAEventSelectI/O模型处理Accept事件
m_hWaitEventList: array of Thandle; //Wait线程内核对象列表(AcceptEx线程)
m_hShutdownEvent: Thandle; //停止服务事件
FDIListenThread: TDIListenThread;
published
property FInitPostAcceptEx: DWORD read m_nInitPostAcceptEx write m_nInitPostAcceptEx;
private
function Startup: Boolean; override;
public
constructor Create;
destructor Destroy; override;
private
FOnNewSocketEvent: TOnSocketEvent; //客户端连接
FOnCloseSocketEvent: TOnSocketEvent; //客户端关闭
FOnReadCompletedEvent: TOnSocketIOEvent; //接受客户端消息
FOnWriteCompletedEvent: TOnSocketIOEvent; //发送客户端消息
published
property OnNewSocketEvent: TOnSocketEvent read FOnNewSocketEvent write FOnNewSocketEvent;
property OnCloseSocketEvent: TOnSocketEvent read FOnCloseSocketEvent write FOnCloseSocketEvent;
property OnReadCompletedEvent: TOnSocketIOEvent read FOnReadCompletedEvent write FOnReadCompletedEvent;
property OnWriteCompletedEvent: TOnSocketIOEvent read FOnWriteCompletedEvent write FOnWriteCompletedEvent;
private
{
procedure DisconnectClient(FClientChannel: TDIClientChannel);
procedure DisconnectAllClient; }
protected
//启动侦听服务
function SetupListner: Boolean;
procedure ListnerThreadProc;
function SetupIOWorkers: Boolean;
procedure IOWorkerThreadProc;
//生成客户端上下文(并投递Recv I/O操作)
//function AssociateIncomingClientWithContext(pDIBuffer: TDIBuffer): Boolean;
//消息分发
procedure ProcessIOMessage(pClientChannel: TDIClientChannel; pDIBuffer: TDIBuffer; dwIoSize: DWORD); override;
//消息处理函数
function PostWSAAcceptEx(pDIBuffer: TDIBuffer): Boolean;
function OnWSAAcceptEx(pDIBuffer: TDIBuffer): Boolean;
function PostWSACloseSocket(pClientChannel: TDIClientChannel; pDIBuffer: TDIBuffer): Boolean;
{function PostWSARecv(pClientChannel: TDIClientChannel; pDIBuffer: TDIBuffer): Boolean;
function PostWSASend(pClientChannel: TDIClientChannel; pDIBuffer: TDIBuffer): Boolean;
function OnWSARecv(pClientChannel: TDIClientChannel; pDIBuffer: TDIBuffer; dwIoSize: DWORD): Boolean;
function OnWSASend(pClientChannel: TDIClientChannel; pDIBuffer: TDIBuffer; dwIoSize: DWORD): Boolean;
}
public
procedure ShutDown; override;
{
procedure FreeAllClientChannel; override;
procedure ParseIOError(FClientChannel: TDIClientChannel; FBuffer: TDIBuffer; dwIoSize: DWORD); override;
}
end;
TDIListenThread = class(TDIThread)
protected
FIOCPServer: TDIIocpSocketServer;
procedure ThreadRoutine; override;
public
constructor Create(SocketServer: TDIIocpSocketServer);
end;
TDIWorkerThread = class(TDIThread)
protected
FIOCPServer: TDIIocpSocketServer;
procedure ThreadRoutine; override;
public
constructor Create(SocketServer: TDIIocpSocketServer);
end;
implementation
uses Dialogs;
//TDIListenThread
constructor TDIListenThread.Create(SocketServer: TDIIocpSocketServer);
begin
inherited Create;
FIOCPServer := SocketServer;
end;
procedure TDIListenThread.ThreadRoutine;
begin
FIOCPServer.ListnerThreadProc;
end;
//TDIWorkerThread
constructor TDIWorkerThread.Create(SocketServer: TDIIocpSocketServer);
begin
inherited Create;
FIOCPServer := SocketServer;
end;
procedure TDIWorkerThread.ThreadRoutine;
begin
FIOCPServer.IOWorkerThreadProc;
end;
constructor TDIIocpSocketServer.Create;
begin
inherited Create;
m_iIOCPPostAccepts := 0;
m_nInitPostAcceptEx := 100;
m_DIMapAcceptExBufferList := TDIMapBuffer.Create;
end;
destructor TDIIocpSocketServer.Destroy;
begin
FreeAndNil(m_DIMapAcceptExBufferList);
inherited Destroy;
end;
function TDIIocpSocketServer.PostWSAAcceptEx(pDIBuffer: TDIBuffer): Boolean;
var
sAccept: TSocket;
dwIoSize: Cardinal;
nRetVal: LongBool;
begin
if pDIBuffer = nil then begin
AppendErrorMessage('PostWSAAcceptEx投递Accept I/O操作失败, pDIBuffer is NULL.');
Result := False;
Exit;
end;
sAccept := Winsock2.WSASocketA(AF_INET, SOCK_STREAM, 0, Nil, 0, WSA_FLAG_OVERLAPPED);
if sAccept = INVALID_SOCKET then begin
AppendErrorMessage(Format('PostWSAAcceptEx 创建监听套按字失败: %d.', [WSAGetLastError()]));
Result := FALSE;
Exit;
end;
//设置标志
dwIoSize := 0;
pDIBuffer.SetOperation(IOWSAAcceptEx);
pDIBuffer.SetupRead;
pDIBuffer.m_Socket := sAccept;
nRetVal := m_lpfnAcceptEx( m_sListenSocket,
pDIBuffer.m_Socket,
pDIBuffer.GetBuffer,
0,
sizeof(TSockAddrIn)+16,
sizeof(TSockAddrIn)+16,
dwIoSize,
POverlapped(@pDIBuffer.PerHandleData.m_overlap));
if( (nRetVal=FALSE) and (WSAGetLastError()<>WSA_IO_PENDING)) then
begin
AppendErrorMessage(Format('PostAcceptProcess投递Accept I/O操作失败: %d.', [WSAGetLastError()]));
Result := FALSE;
Exit;
end;
Result := TRUE;
end;
function TDIIocpSocketServer.OnWSAAcceptEx(pDIBuffer: TDIBuffer): Boolean;
begin
//判断接受到的数据为零,则断开连接。
if(pDIBuffer.m_Socket = INVALID_SOCKET) then begin
AppendErrorMessage('OnWSAAcceptEx接受到dwTrans为零, 客户端关闭Socket连接.');
WinSock2.closesocket(pDIBuffer.m_Socket);
pDIBuffer.m_Socket := INVALID_SOCKET;
Result := FALSE;
end;
//1.设置AcceptEx的Socket为ListenSocket状态。
//2.生成客户端上下文(投递Recv I/O操作)。
if ChangeSocketModeAccept(pDIBuffer.m_Socket) then
begin
//if not AssociateIncomingClientWithContext(pDIBuffer) then
// AppendErrorMessage('AssociateIncomingClientWithContext is Failer.');
end;
//Accept请求完成,移出AcceptEx队列
m_DIMapAcceptExBufferList.RemoveDIBuffer(pDIBuffer);
//回收Buffer
m_FreeDIBufferList.ReleaseBufferToPool(pDIBuffer);
//通知监听线程继续再投递一个Accept请求
InterlockedIncrement(m_iIOCPPostAccepts);
//m_hPostAcceptEvent事件,I/O的线程连接新用户,投递维持一定AcceptEx数量
SetEvent(m_hPostAcceptEvent);
Result := TRUE;
end;
//侦听线程
procedure TDIIocpSocketServer.ListnerThreadProc;
var
i: Integer;
nRet, nIndex, m_nPostAccepts: DWORD;
pBuffer: TDIBuffer;
Events: TWSANetworkEvents;
begin
//投递一定数量的AcceptEx I/O
for i:=1 to m_nInitPostAcceptEx do begin
pBuffer := nil;
pBuffer := m_FreeDIBufferList.AllocateFreeBufferFromPool;
if (pBuffer = nil) then begin
AppendErrorMessage('AllocateFreeBufferFromPool错误, ListnerThreadProc线程退出.');
Exit;
end
else
begin
m_DIMapAcceptExBufferList.AddDIBuffer(pBuffer);
if not PostWSAAcceptEx(pBuffer) then begin
AppendErrorMessage('PostWSAAcceptEx投递Accept I/O错误, ListnerThreadProc线程退出.');
Exit;
end;
end;
end;
while (WaitForSingleObject(m_hShutdownEvent, INFINITE) <> WAIT_OBJECT_0) do
begin
//等网络事件m_hPostAcceptEvent事件和m_hListenThreadEvent事件
nIndex := WSAWaitForMultipleEvents(2, @m_hWaitEventList, FALSE, INFINITE, FALSE);
if (nIndex = WSA_WAIT_FAILED) then begin
AppendErrorMessage('WSA_WAIT_FAILED错误, ListnerThreadProc线程退出.');
Break;
end;
nIndex := nIndex - WAIT_OBJECT_0;
//m_hPostAcceptEvent事件,I/O的线程连接新用户,投递维持一定AcceptEx数量
if (nIndex = 0 ) then begin
m_nPostAccepts := InterlockedExchange(m_iIOCPPostAccepts, 0);
end
else if (nIndex = 1 ) then begin
nRet := WSAEnumNetworkEvents( m_sListenSocket,
m_hWaitEventList[nIndex],
@Events );
if (nRet = SOCKET_ERROR) then begin
AppendErrorMessage(Format('WSAEnumNetworkEvents错误%d, ListnerThreadProc线程退出.', [WSAGetLastError()]));
Break;
end;
if (Events.lNetworkEvents = FD_ACCEPT) then begin
//多次检查m_bShutDown状态,以便退出线程
if ( (Events.iErrorCode[FD_ACCEPT_BIT] = 0) and
(m_bAcceptConnections) and
(not m_bShutDown) ) then begin
m_nPostAccepts := 50; //增加的个数,这里设为50个
end
else
begin
AppendErrorMessage(Format('异常WSAEnumNetworkEvents事件错误%d, ListnerThreadProc线程退出.', [WSAGetLastError()]));
Break;
end;
end;
end;
//继续投递一定数量的AcceptEx I/O
for i:=1 to m_nPostAccepts do begin
pBuffer := nil;
pBuffer := m_FreeDIBufferList.AllocateFreeBufferFromPool;
if (pBuffer = nil) then begin
AppendErrorMessage('AllocateFreeBufferFromPool错误, ListnerThreadProc线程退出.');
Exit;
end
else
begin
m_DIMapAcceptExBufferList.AddDIBuffer(pBuffer);
if not PostWSAAcceptEx(pBuffer) then begin
AppendErrorMessage('PostWSAAcceptEx投递Accept I/O错误, ListnerThreadProc线程退出.');
Exit;
end;
end;
end;
end;
end;
function TDIIocpSocketServer.SetupListner: Boolean;
var
bVal: LongBool;
nRet: Integer;
dwBytesReceived: DWORD;
dwThreadID: DWORD;
sock_Addr: TSockAddrIn;
begin
//创建socket
m_sListenSocket := Winsock2.WSASocket(AF_INET, SOCK_STREAM, 0, Nil, 0, WSA_FLAG_OVERLAPPED);
if m_sListenSocket = INVALID_SOCKET then begin
AppendErrorMessage(Format('创建 Socket监听套按字失败: %d.', [WSAGetLastError()]));
Result := FALSE;
Exit;
end;
//设置端口复用
bVal := True;
nRet := SetSockOpt(m_sListenSocket, SOL_SOCKET, SO_REUSEADDR, PChar(@bVal), SizeOf(bVal));
if (nRet = SOCKET_ERROR) then begin
AppendErrorMessage(Format('setsockopt(SO_EXCLUSIVEADDRUSE) 创建失败: %d.', [WSAGetLastError()]));
Winsock2.closesocket(m_sListenSocket);
Result := FALSE;
Exit;
end;
//设置Wait线程内核对象列表(AcceptEx线程)
SetLength(m_hWaitEventList, 2);
//投递Accept I/O事件内核对象
m_hPostAcceptEvent := CreateEvent(nil, FALSE, FALSE, nil);
if (m_hPostAcceptEvent = WSA_INVALID_EVENT) then begin
AppendDisplayMsg(Format('WSACreateEvent() 创建失败: %d.', [WSAGetLastError()]));
WSACloseEvent(m_hShutdownEvent);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -