📄 udithreadpool.pas
字号:
unit uDIThreadPool;
interface
uses
Windows, SysUtils, Classes, WinSock2, uCriticalSection, uDIProtocol, uDIBuffer,
uDIClientContext, uDISocketServer, uDIThread;
{$I IOCP.inc}
const
ERROR_INVALID_PARAMETER = 87;
ERROR_NETNAME_DELETED = 64;
type
//线程结构
PThreadParam = ^ThreadParam;
ThreadParam = packed record
m_bRun: Boolean; //是否运行
nThread: DWORD; //索引
FSocketServer: TDISocketServer; //指向IOCPServer的指针
FDIThread: TDIThread; //指向线程
hEvent: Thandle; //事件
end;
TDIWorkerThread = class(TDIThread)
protected
FThreadID: Integer;
FSocketServer: TDISocketServer;
procedure ThreadRoutine; override;
public
constructor Create(SocketServer: TDISocketServer; m_nThreadID: Integer);
destructor Destroy; override;
end;
TDIThreadPool = class
private
m_bStarted: Boolean; //是否启动
m_ThreadsList: TList;
FSocketServer: TDISocketServer;
FThreadPoolLock: TCriticalSection;
private
m_nIOMinWorkThreads: Integer;
m_nIOMaxWorkThreads: Integer;
m_nIOWorkerThreads: Integer;
procedure SetMinWorkThreads(iMinWorkThreads: Integer);
procedure SetMaxWorkThreads(iMaxWorkThreads: Integer);
procedure SetIOWorkersThread(iWorkerThreads: Integer);
published
property FStarted: Boolean read m_bStarted;
property FMinWorkThreads: Integer read m_nIOMinWorkThreads write SetMinWorkThreads;
property FMaxWorkThreads: Integer read m_nIOMaxWorkThreads write SetMaxWorkThreads;
property FIOWorkThreads: Integer read m_nIOWorkerThreads write SetIOWorkersThread;
public
function StartThreadPool: Boolean;
procedure StopThreadPool;
public
constructor Create(SocketServer: TDISocketServer);
destructor Destroy; override;
end;
implementation
uses uDIMonitor, uFileLogger;
//TDIWorkerThread
constructor TDIWorkerThread.Create(SocketServer: TDISocketServer; m_nThreadID: Integer);
begin
inherited Create;
FThreadID := m_nThreadID;
FSocketServer := SocketServer;
end;
destructor TDIWorkerThread.Destroy;
begin
inherited Destroy;
end;
procedure TDIWorkerThread.ThreadRoutine;
var
dwIOSize: DWORD;
bIORet: LongBool;
dwIOError: DWORD;
FBuffer: TDIBuffer;
pHandleData: PPerHandleData;
FClientContext: TDIClientContext;
FIOEventType: IOEventType;
begin
while (TRUE) do
begin
pHandleData := nil;
FClientContext := nil;
FBuffer := nil;
bIORet := FSocketServer.FIOCompletionPort.GetIOCompletionStatus( FClientContext,
pHandleData,
dwIOSize );
if (dwIOSize = $FFFFFFFF) then begin
{$IFDEF _ICOP_DEBUG}
FSocketServer.AppendLogMessage('收到PostIOCompletionStatus FFFFFFFF消息, Work线程退出');
{$ENDIF}
Exit;
end;
if (pHandleData<> nil) and
(pHandleData.pDIBuffer<> nil) then begin
FIOEventType := IOEventType(pHandleData.m_overlap.Offset);
FBuffer := TDIBuffer(pHandleData.pDIBuffer);
end
else
begin
{$IFDEF _ICOP_DEBUGERR}
FSocketServer.AppendErrorLogMessage('FBuffer为空, IOCP异常, Work线程继续');
{$ENDIF}
Continue;
end;
//计数减一
if (FClientContext<>nil) then begin
FClientContext.FContextLock.Lock;
FClientContext.ExitIOLoop(FIOEventType);
FClientContext.FContextLock.UnLock;
end;
{$IFDEF _IOCP_MONITOR}
if FIOEventType = IOWSARecv then _DIMonitor.SubIOCPRecv;
if FIOEventType = IOWSASend then _DIMonitor.SubIOCPSend;
if FIOEventType = IOWSACloseSocket then _DIMonitor.SubIOCPCloseSocke;
if FIOEventType = IOWSAAcceptEx then _DIMonitor.SubIOCPAcceptEx;
{$ENDIF}
if bIORet then
begin
//一.收到用户自定义退出消息
if (FIOEventType = IOWSACloseSocket) and
(FClientContext<>nil) then begin
FSocketServer.ProcessIOError(FClientContext, FIOEventType, IOErrorCustom);
continue;
end;
//二.得到正常断开的通知
//GetQueuedCompletionStatus返回true,并且dwIOSize返回0 且 FClientContext, pHandleData不为空
if ( (dwIOSize=0) and
(FBuffer<>Nil) and
(FClientContext<>nil) ) then begin
FSocketServer.ProcessIOError(FClientContext, FIOEventType, IOErrorNoramal);
continue;
end;
//消息分发
if (FBuffer<>Nil) then
FSocketServer.ProcessIOMessage(FIOEventType, FClientContext, FBuffer, dwIOSize)
else
begin
{$IFDEF _ICOP_DEBUGERR}
FSocketServer.AppendErrorLogMessage('消息分发时FBuffer为空, IOCP异常, Work线程继续');
{$ENDIF}
end;
end
else
begin
//三.异常断开(未得到正常断开的通知)如果得到自定义断开,不处理
//GetQueuedCompletionStatus 返回false
//GetLastError返回ERROR_NETNAME_DELETED或者ERROR_INVALID_PARAMETER
dwIOError := GetLastError();
if ( dwIOError<>WAIT_TIMEOUT ) then begin
if (FClientContext<>nil) then begin
//if (dwIOError = ERROR_NETNAME_DELETED) or (dwIOError = ERROR_INVALID_PARAMETER)
//then
FSocketServer.ProcessIOError(FClientContext, FIOEventType, IOErrorAbnormalNoramal);
continue;
end;
end;
end;
end;
end;
//TDIThreadPool
constructor TDIThreadPool.Create(SocketServer: TDISocketServer);
begin
inherited Create;
FThreadPoolLock := TCriticalSection.Create;
m_ThreadsList := TList.Create;
FSocketServer := SocketServer;
m_nIOMaxWorkThreads := 10;
m_nIOMinWorkThreads := 2;
m_nIOWorkerThreads := 4;
m_bStarted := False;
end;
destructor TDIThreadPool.Destroy;
begin
if m_bStarted then StopThreadPool;
FreeAndNil(FThreadPoolLock);
FreeAndNil(m_ThreadsList);
inherited Destroy;
end;
procedure TDIThreadPool.SetMinWorkThreads(iMinWorkThreads: Integer);
begin
if ( (iMinWorkThreads > m_nIOMaxWorkThreads) or
(iMinWorkThreads < 0) ) then Exit;
FThreadPoolLock.Lock;
m_nIOMinWorkThreads := iMinWorkThreads;
FThreadPoolLock.UnLock;
end;
procedure TDIThreadPool.SetMaxWorkThreads(iMaxWorkThreads: Integer);
begin
if ( (iMaxWorkThreads < m_nIOMinWorkThreads) or
(iMaxWorkThreads < 0) ) then Exit;
FThreadPoolLock.Lock;
m_nIOMaxWorkThreads := iMaxWorkThreads;
FThreadPoolLock.UnLock;
end;
procedure TDIThreadPool.SetIOWorkersThread(iWorkerThreads: Integer);
begin
if ( (iWorkerThreads < m_nIOMinWorkThreads) or
(iWorkerThreads > m_nIOMaxWorkThreads) ) then Exit;
FThreadPoolLock.Lock;
m_nIOWorkerThreads := iWorkerThreads;
FThreadPoolLock.UnLock;
end;
function TDIThreadPool.StartThreadPool: Boolean;
var
i: Integer;
p_Threads: PThreadParam;
WorkerThread: TDIWorkerThread;
begin
if (m_bStarted) then begin
Result := False;
Exit;
end;
//创建服务器端启动工作线程
for i := 0 to m_nIOWorkerThreads - 1 do
begin
WorkerThread := TDIWorkerThread.Create(FSocketServer, i+1);
New(p_Threads);
p_Threads^.nThread := i+1;
p_Threads^.FSocketServer := FSocketServer;
p_Threads^.FDIThread := WorkerThread;
p_Threads^.hEvent := WorkerThread.Handle;
m_ThreadsList.Add(p_Threads);
WorkerThread.Run;
end;
{$IFDEF _ICOP_DEBUG}
FSocketServer.AppendLogMessage('线程池工作线程启动.');
{$ENDIF}
m_bStarted := TRUE;
Result := TRUE;
end;
procedure TDIThreadPool.StopThreadPool;
var
i: Integer;
begin
if (m_bStarted) then begin
//关闭所有连接, 侦听套接字
FSocketServer.StopServer;
{$IFDEF _ICOP_DEBUG}
FSocketServer.AppendLogMessage('收到FSocketServer.StopServer消息');
{$ENDIF}
//退出线程
for i:=0 to m_nIOWorkerThreads - 1 do
FSocketServer.FIOCompletionPort.PostIOCompletionStatus(0, nil, $FFFFFFFF);
{$IFDEF _ICOP_DEBUG}
FSocketServer.AppendLogMessage('PostIOCompletionStatus投递工作线程退出消息.');
{$ENDIF}
for i:=0 to m_nIOWorkerThreads - 1 do
WaitForSingleObject(PThreadParam(m_ThreadsList.Items[i]).hEvent, INFINITE);
{$IFDEF _ICOP_DEBUG}
FSocketServer.AppendLogMessage('WaitForSingleObject成功等待所有工作线程退出.');
{$ENDIF}
for i:=0 to m_nIOWorkerThreads - 1 do begin
FreeAndNil(PThreadParam(m_ThreadsList.Items[i]).FDIThread);
PThreadParam(m_ThreadsList.Items[i]).hEvent := 0;
Dispose(PThreadParam(m_ThreadsList.Items[i]));
end;
m_ThreadsList.Clear;
//回收资源
FSocketServer.ProcessFreeMemory;
{$IFDEF _ICOP_DEBUG}
FSocketServer.AppendLogMessage('IOCP服务器成功释放所有内存,工作线程池停止.');
{$ENDIF}
m_bStarted := FALSE;
end;
end;
end.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -