📄 uthreadpool.pas
字号:
//
// Copyright (c) 2002 by Max Koudelkin
// Commercial use requires permission.
// E-mail to: maxxx@avia.formoza.ru
//
unit uThreadPool;
interface
uses
Windows, SysUtils, Classes, Contnrs,
uMultiThreadCtrl, uInterlockedList;
const
C_MAXPOOLSIZE = 1024;
C_DEFPOOLSIZE = 10;
type
EThreadPoolError = class( Exception );
TRecycleThread = class;
TRecycleThreadClass = class of TRecycleThread;
TRecycleThreadPool = class;
TCreateThreadEvent = TNotifyEvent;
TRecycleThread = class( TThread )
private
FOwner : TRecycleThreadPool;
FIdle : Integer;
FEntry : TInterlockedEntryRec;
procedure SuspendResume;
procedure SetIdle;
procedure SetBusy;
protected
procedure Execute; override;
procedure RecycleExecute; virtual; abstract;
public
property Owner : TRecycleThreadPool read FOwner;
constructor Create( AOwner : TRecycleThreadPool );
destructor Destroy; override;
procedure Start;
end;
TRecycleThreadPool = class( TMultiThreadCtrl )
private
FThreadListLock : TRTLCriticalSection;
FThreadList : TObjectList;
FIdleThreads : TInterlockedList;
FIdleCount : Integer;
FBusyCount : Integer;
protected
function LockThreadList : TList;
procedure UnLockThreadList;
procedure Start; override;
procedure Stop; override;
function CreateThread : TRecycleThread; virtual;
procedure ReleaseThread( AThread : TRecycleThread ); virtual;
procedure DestroyThread;
private
FThreadClass : TRecycleThreadClass;
FOnCreateThread : TCreateThreadEvent;
public
property ThreadClass : TRecycleThreadClass
read FThreadClass
write FThreadClass;
property OnCreateThread : TCreateThreadEvent
read FOnCreateThread
write FOnCreateThread;
public
property IdleCount : Integer read FIdleCount;
property BusyCount : Integer read FBusyCount;
constructor Create( AOwner : TComponent ); override;
destructor Destroy; override;
function AcquireThread : TRecycleThread; virtual;
end;
TRecycleThreadPoolSized = class( TRecycleThreadPool )
private
//事件名柄,事件对象是同步线程的最基本形式,
//它用以向其它线程发信号以表示某一操作已经完成。
//线程和事件在任何时候都处于两种状态之一:有信号和无信号。
//当线程被创建和正在运行时,它是无信号的。一旦线程终止,它就变成有信号的。
FShutdown : THandle;
FIdlePhore : THandle; //信号量(Semaphore),控制线程数量
FPoolSize : Integer; //线程池的大小
FTimeOut : Cardinal;
procedure SetPoolSize( AValue : Integer ); //设置线程池的大小
procedure IncPoolSize( ADiff : Integer ); //增加线程池的大小
procedure DecPoolSize( ADiff : Integer ); //减小线程池的大小
protected
procedure Start; override;
procedure Stop; override;
procedure ReleaseThread( AThread : TRecycleThread ); override;
public
property PoolSize : Integer read FPoolSize write SetPoolSize;
property TimeOut : Cardinal read FTimeOut write FTimeOut;
constructor Create( AOwner : TComponent ); override;
destructor Destroy; override;
function AcquireThread : TRecycleThread; override;
end;
implementation
uses
Consts;
resourcestring
SPoolInactive = 'Can''t perform operation on inactive ThreadPool';
STimeoutExpired = 'Wait timeout expired';
SInvalidPropertyValue = 'Invalid Property Value';
{ TRecycleThread }
constructor TRecycleThread.Create( AOwner : TRecycleThreadPool );
begin
inherited Create( True );
{$IFDEF DEBUG}
OutputDebugString( PChar( Format( '%s.Create ThreadId=%x', [ClassName, ThreadID] ) ) );
{$ENDIF}
FEntry.Data := Self;
FOwner := AOwner;
FIdle := 1;
end;
destructor TRecycleThread.Destroy;
begin
// for the thread disappear from ThreadStatus window
Terminate;
Resume;
{$IFDEF DEBUG}
OutputDebugString( PChar( Format( '%s.Destroy ThreadId=%x', [ClassName, ThreadID] ) ) );
{$ENDIF}
inherited;
end;
procedure TRecycleThread.Execute;
begin
while not Terminated do
begin
try
RecycleExecute;
except
on E : Exception do
Owner.HandleException( E );
end;
SetIdle;
end;
end;
procedure TRecycleThread.SetIdle;
begin
InterlockedIncrement( FIdle );
InterlockedDecrement( Owner.FBusyCount );
Owner.ReleaseThread( Self );
SuspendResume;
end;
procedure TRecycleThread.SetBusy;
begin
InterlockedIncrement( Owner.FBusyCount );
InterlockedDecrement( FIdle );
SuspendResume;
end;
procedure TRecycleThread.SuspendResume;
begin
if Terminated then
inherited Resume
else if FIdle > 0 then
inherited Suspend
else
inherited Resume;
end;
procedure TRecycleThread.Start;
begin
Assert( FIdle > 0 );
SetBusy;
end;
{ TRecycleThreadPool }
type
TInterlockedListProtected = class( TInterlockedList );
constructor TRecycleThreadPool.Create( AOwner : TComponent );
begin
inherited;
InitializeCriticalSection( FThreadListLock );
FIdleThreads := TInterlockedLIFO.Create;
FThreadList := TObjectList.Create;
end;
destructor TRecycleThreadPool.Destroy;
begin
FThreadList.Free;
FIdleThreads.Free;
DeleteCriticalSection( FThreadListLock );
inherited;
end;
function TRecycleThreadPool.LockThreadList : TList;
begin
EnterCriticalSection( FThreadListLock );
Result := FThreadList;
end;
procedure TRecycleThreadPool.UnLockThreadList;
begin
LeaveCriticalSection( FThreadListLock );
end;
procedure TRecycleThreadPool.Start;
begin
end;
procedure TRecycleThreadPool.Stop;
begin
with LockThreadList do
try
Clear;
TInterlockedListProtected( FIdleThreads ).InternalFlush;
finally
UnLockThreadList;
end;
end;
function TRecycleThreadPool.CreateThread : TRecycleThread;
begin
if FThreadClass <> nil then
Result := FThreadClass.Create( Self )
else
raise EInvalidOperation.CreateRes( @SInvalidPropertyValue );
if Assigned( FOnCreateThread ) then
FOnCreateThread( Result );
end;
function TRecycleThreadPool.AcquireThread : TRecycleThread;
begin
if not Active then
raise EInvalidOperation.CreateRes( @SPoolInactive );
Result := TInterlockedListProtected( FIdleThreads ).InternalPop^.Data;
if Result = nil then
with LockThreadList do
try
Result := CreateThread;
Add( Result );
finally
UnLockThreadList;
end
else
InterlockedDecrement( FIdleCount );
end;
procedure TRecycleThreadPool.ReleaseThread( AThread : TRecycleThread );
begin
TInterlockedListProtected( FIdleThreads ).InternalPush( @AThread.FEntry );
InterlockedIncrement( FIdleCount );
end;
procedure TRecycleThreadPool.DestroyThread;
var LThread : TRecycleThread;
begin
LThread := TInterlockedListProtected( FIdleThreads ).InternalPop^.Data;
if LThread <> nil then
with LockThreadList do
try
Remove( LThread );
InterlockedDecrement( FIdleCount );
finally
UnLockThreadList;
end;
end;
{ TRecycleThreadPoolSized }
constructor TRecycleThreadPoolSized.Create( AOwner : TComponent );
begin
inherited;
FIdlePhore := CreateSemaphore( nil, 0, C_MAXPOOLSIZE, nil );
if FIdlePhore = 0 then
RaiseLastWin32Error;
FShutdown := CreateEvent( nil, True, False, nil );
if FShutdown = 0 then
RaiseLastWin32Error;
FTimeOut := INFINITE;
SetPoolSize( C_DEFPOOLSIZE );
end;
destructor TRecycleThreadPoolSized.Destroy;
begin
CloseHandle( FIdlePhore );
CloseHandle( FShutdown );
inherited;
end;
procedure TRecycleThreadPoolSized.Start;
begin
inherited;
ResetEvent( FShutdown ); //将事件置成无信号
end;
procedure TRecycleThreadPoolSized.Stop;
begin
SetEvent( FShutdown ); //将事件置成有信号
inherited;
end;
procedure TRecycleThreadPoolSized.SetPoolSize( AValue : Integer );
var LDiff : Integer;
begin
if (AValue < 0) or (AValue > C_MAXPOOLSIZE) then
raise EInvalidOperation.CreateResFmt( @SOutOfRange, [0, C_MAXPOOLSIZE] );
LockProperties;
try
if FPoolSize <> AValue then
begin
LDiff := AValue - FPoolSize;
if LDiff > 0
then IncPoolSize( LDiff )
else DecPoolSize( LDiff );
FPoolSize := AValue;
end;
finally
UnLockProperties;
end;
end;
procedure TRecycleThreadPoolSized.IncPoolSize( ADiff : Integer );
begin
if not ReleaseSemaphore( FIdlePhore, ADiff, nil ) then
RaiseLastWin32Error;
end;
procedure TRecycleThreadPoolSized.DecPoolSize( ADiff : Integer );
var LSize : Integer;
begin
LSize := FPoolSize;
while ADiff < 0 do
case WaitForSingleObject( FIdlePhore, INFINITE ) of
WAIT_OBJECT_0 :
with LockThreadList do
try
Inc( ADiff );
Dec( LSize );
if Count > LSize then
DestroyThread;
finally
UnLockThreadList;
end;
else { WAIT_FAILED }
RaiseLastWin32Error;
end;
end;
{$WARNINGS OFF}
function TRecycleThreadPoolSized.AcquireThread : TRecycleThread;
var LHandles : array [0..1] of THandle;
begin
if not Active then
raise EInvalidOperation.CreateRes( @SPoolInactive );
LHandles[0] := FShutdown;
LHandles[1] := FIdlePhore;
case WaitForMultipleObjects(
Length( LHandles ),
@LHandles,
False,
TimeOut )
of
WAIT_TIMEOUT : raise EThreadPoolError.CreateRes( @STimeoutExpired );
WAIT_OBJECT_0 : Abort;
WAIT_OBJECT_0 + 1 : Result := inherited AcquireThread;
else { WAIT_FAILED }
RaiseLastWin32Error;
end;
end;
{$WARNINGS ON}
procedure TRecycleThreadPoolSized.ReleaseThread( AThread : TRecycleThread );
begin
inherited;
ReleaseSemaphore( FIdlePhore, 1, nil );
end;
end.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -