📄 dnthreadexecutor.pas
字号:
// The contents of this file are used with permission, subject to
// the Mozilla Public License Version 1.1 (the "License"); you may
// not use this file except in compliance with the License. You may
// obtain a copy of the License at
// http://www.mozilla.org/MPL/MPL-1.1.html
//
// Software distributed under the License is distributed on an
// "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
// implied. See the License for the specific language governing
// rights and limitations under the License.
{$I DnConfig.inc}
unit DnThreadExecutor;
interface
uses
Classes, SysUtils, contnrs, syncobjs, Windows,
DnConst, DnInterfaces, DnRtl, DnAbstractExecutor, DnAbstractLogger;
type
TDnWorkerThread = class;
TDnThreadExecutor = class;
TDnThreadCreateEvent = function (Pool: TDnThreadExecutor): Boolean of object;
TDnThreadRemoveEvent = function (Pool: TDnThreadExecutor; Thread: TDnWorkerThread;
Context: TDnThreadContext): Boolean of object;
TDnThreadExecutor = class(TDnAbstractExecutor)
protected
FActive: Boolean;
FSyncCtxCreating: Boolean;
FMinThread: Integer;
FMaxThread: Integer;
FMaxQueueSize: Integer;
FRefuseOverload: Boolean;
FPreCreateThreads:Boolean;
FOnThreadCreate: TDnThreadCreateEvent;
FOnThreadRemove: TDnThreadRemoveEvent;
FOnLogMessage: TDnLogEvent;
FEvent: TInterfaceList;
FWorker: TObjectList;
FBusyCount: Integer;
FGuard: TDnMutex;
FNonEmpty: TDnSemaphore;
FStopping: Boolean;
procedure SetActive(Value: Boolean);
procedure SetMinThread(Value: Integer);
procedure SetMaxThread(Value: Integer);
function LaunchThread: TDnWorkerThread;
function GetQueueSize: Integer;
function GetBusyCount: Integer;
procedure SetMaxQueueSize(Value: Integer);
function GetMaxQueueSize: Integer;
procedure Lock;
procedure Unlock;
procedure DoLogEvent(Msg: String);
procedure IncrementBusy;
procedure DecrementBusy;
procedure RemoveThread(Thread: TDnWorkerThread);
public
{$IFDEF ROOTISCOMPONENT}
constructor Create(AOwner: TComponent); override;
{$ELSE}
constructor Create;
{$ENDIF}
destructor Destroy; override;
function PostEvent(Event: IDnIOResponse): Boolean; override;
function GetEvent( Thread: TDnWorkerThread; var Event: IDnIOResponse): Boolean;
procedure BindChannelToContext(Channel: IDnIOTrackerHolder; Context: TDnThreadContext);
procedure UnbindChannelFromContext(Channel: IDnIOTrackerHolder; Context: TDnThreadContext);
property QueueSize: Integer read GetQueueSize;
property BusyCount: Integer read GetBusyCount;
published
property Active: Boolean read FActive write SetActive;
property RefuseOverload: Boolean read FRefuseOverload write FRefuseOverload;
property PreCreateThreads: Boolean read FPreCreateThreads write FPreCreateThreads;
property MinThread: Integer read FMinThread write SetMinThread;
property MaxThread: Integer read FMaxThread write SetMaxThread;
property MaxQueueSize: Integer read GetMaxQueueSize write SetMaxQueueSize;
property SyncContextCreating: Boolean read FSyncCtxCreating write FSyncCtxCreating;
property OnThreadCreate: TDnThreadCreateEvent read FOnThreadCreate write FOnThreadCreate;
property OnThreadRemove: TDnThreadRemoveEvent read FOnThreadRemove write FOnThreadRemove;
end;
TDnWorkerThread = class(TDnThread)
protected
FPool: TDnThreadExecutor;
FBusy: Boolean;
FBoundCount: Cardinal;
FEvent: TInterfaceList;
FNonEmpty: TDnSemaphore;
FGuard: TDnMutex;
function GetEvent(var Event: IDnIOResponse): Boolean;
procedure Lock;
procedure Unlock;
function PostEvent(Event: IDnIOResponse): Boolean;
procedure Bind(Source: IDnIOTrackerHolder);
procedure Unbind(Source: IDnIOTrackerHolder);
function DoExit: Boolean;
procedure ThreadRoutine; override;
procedure CreateContext; override;
procedure DestroyContext; override;
public
constructor Create(Pool: TDnThreadExecutor);
destructor Destroy; override;
property Busy: Boolean read FBusy;
property Pool: TDnThreadExecutor read FPool write FPool;
end;
//----------------------------------------------------------------------------
procedure Register;
implementation
constructor TDnThreadExecutor.Create{$IFDEF ROOTISCOMPONENT}(AOwner: TComponent){$ENDIF};
begin
inherited Create{$IFDEF ROOTISCOMPONENT}(AOwner){$ENDIF};
FWorker := TObjectList.Create(True);
FEvent := TInterfaceList.Create;
//FSource := TInterfaceList.Create;
FMaxQueueSize := 1024;
FMaxThread := 16;
FMinThread := 0;
FRefuseOverload := False;
FPreCreateThreads := False;
FOnThreadCreate := Nil;
FOnThreadRemove := Nil;
FLogger := Nil;
FLogLevel := llMandatory;
//FContext := Nil;
FGuard := TDnMutex.Create;
FNonEmpty := TDnSemaphore.Create(0, $7FFFFFFF);
FSyncCtxCreating := True;
FStopping := False;
end;
destructor TDnThreadExecutor.Destroy;
begin
FreeAndNil(FWorker);
FreeAndNil(FEvent);
FreeAndNil(FGuard);
FreeAndNil(FNonEmpty);
inherited Destroy;
end;
procedure TDnThreadExecutor.DoLogEvent(Msg: String);
begin
try
if Assigned(FLogger) then
FLogger.LogMsg(FLogLevel, Msg);
except
; //suppress any exceptions. Server SHOULD works!
end;
end;
procedure TDnThreadExecutor.IncrementBusy;
begin
InterLockedIncrement(FBusyCount);
end;
procedure TDnThreadExecutor.DecrementBusy;
begin
InterLockedDecrement(FBusyCount);
end;
procedure TDnThreadExecutor.SetMaxQueueSize(Value: Integer);
begin
FMaxQueueSize := Value;
end;
function TDnThreadExecutor.GetMaxQueueSize: Integer;
begin
Result := FMaxQueueSize;
end;
function TDnThreadExecutor.LaunchThread: TDnWorkerThread;
var AllowCreate: Boolean;
Context: TDnThreadContext;
begin
if @FOnThreadCreate <> Nil then
AllowCreate := FOnThreadCreate(Self)
else
AllowCreate := True;
if AllowCreate then
begin
Result := TDnWorkerThread.Create(Self);
Context := Nil;
if Result <> Nil then
begin
if Assigned(FOnCreateContext) and not FSyncCtxCreating then
Context := FOnCreateContext(Result)
else
if not FSyncCtxCreating then
Context := TDnThreadContext.Create(Result);
Result.FContext := Context;
FWorker.Add(Result);
Result.Run;
end;
end else
begin
Self.DoLogEvent(SCannotCreateThread);
Result := Nil;
end;
end;
procedure TDnThreadExecutor.RemoveThread(Thread: TDnWorkerThread);
begin
try
try
FGuard.Acquire;
Thread.FreeOnTerminate := True;
FWorker.Extract(Thread);
Thread.Terminate;
//handlers for OnDestroyRuntimeContext and OnThreadFinish should be called from thread
finally
FGuard.Release;
end;
except
DoLogEvent(SCannotRemoveThread);
end;
end;
procedure TDnThreadExecutor.SetActive(Value: Boolean);
var
i: Integer;
begin
if not FActive and Value then
begin //turn on
FGuard.Acquire;
FStopping := False;
if FPreCreateThreads then
begin
for i:=0 to FMinThread - 1 do
LaunchThread;
end;
FActive := True;
FGuard.Release;
end
else if FActive and not Value then
begin //stop
FGuard.Acquire;
FStopping := True;
for i:=0 to FWorker.Count-1 do
if TDnWorkerThread(FWorker[i]).FBoundCount > 0 then
TDnWorkerThread(FWorker[i]).FNonEmpty.Release; //signal about 'NULL' event
FNonEmpty.Release; //signal about 'NULL' event
for i:=0 to FWorker.Count-1 do
TDnWorkerThread(FWorker[i]).Terminate;
FNonEmpty.Release;
//signal to terminate
while FWorker.Count <> 0 do
begin
FGuard.Release;
Sleep(10);
FGuard.Acquire;
end;
//Clear all incoming events - do not handle&destroy them
FEvent.Clear;
FWorker.Clear; //this statement should stop&destroy all workers
FActive := False;
FStopping := False;
FGuard.Release;
end;
end;
function TDnThreadExecutor.PostEvent(Event: IDnIOResponse): Boolean;
var Tracker: IDnIOTrackerHolder;
CommonQueue: Boolean;
begin
Result := False;
//check the binding
CommonQueue := True;
Tracker := Event.Channel as IDnIOTrackerHolder;
if Tracker <> Nil then
begin
if Tracker.Tracker <> Nil then
begin
TDnWorkerThread(Tracker.Tracker).PostEvent(Event);
CommonQueue := False;
end;
Tracker := Nil;
end;
if CommonQueue then
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -