⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dnthreadexecutor.pas

📁 一个国外比较早的IOCP控件
💻 PAS
📖 第 1 页 / 共 2 页
字号:
// The contents of this file are used with permission, subject to
// the Mozilla Public License Version 1.1 (the "License"); you may
// not use this file except in compliance with the License. You may
// obtain a copy of the License at
// http://www.mozilla.org/MPL/MPL-1.1.html
//
// Software distributed under the License is distributed on an
// "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
// implied. See the License for the specific language governing
// rights and limitations under the License.
{$I DnConfig.inc}
unit DnThreadExecutor;

interface
uses
  Classes, SysUtils, contnrs, syncobjs, Windows,
  DnConst, DnInterfaces, DnRtl, DnAbstractExecutor, DnAbstractLogger;

type
  TDnWorkerThread = class;
  TDnThreadExecutor = class;
  
  TDnThreadCreateEvent = function (Pool: TDnThreadExecutor): Boolean of object;
  TDnThreadRemoveEvent = function (Pool: TDnThreadExecutor; Thread: TDnWorkerThread;
                                    Context: TDnThreadContext): Boolean of object;
  
  TDnThreadExecutor = class(TDnAbstractExecutor)
  protected
    FActive:          Boolean;
    FSyncCtxCreating: Boolean;
    FMinThread:       Integer;
    FMaxThread:       Integer;
    FMaxQueueSize:    Integer;
    FRefuseOverload:  Boolean;
    FPreCreateThreads:Boolean;
    FOnThreadCreate:  TDnThreadCreateEvent;
    FOnThreadRemove:  TDnThreadRemoveEvent;
    FOnLogMessage:    TDnLogEvent;
    FEvent:           TInterfaceList;
    FWorker:          TObjectList;
    FBusyCount:       Integer;
    FGuard:           TDnMutex;
    FNonEmpty:        TDnSemaphore;
    FStopping:        Boolean;
    
    procedure SetActive(Value: Boolean);
    procedure SetMinThread(Value: Integer);
    procedure SetMaxThread(Value: Integer);

    function  LaunchThread: TDnWorkerThread;
    function  GetQueueSize: Integer;
    function  GetBusyCount: Integer;
    procedure SetMaxQueueSize(Value: Integer);
    function  GetMaxQueueSize: Integer;
    procedure Lock;
    procedure Unlock;
    procedure DoLogEvent(Msg: String);

    procedure IncrementBusy;
    procedure DecrementBusy;
    procedure RemoveThread(Thread: TDnWorkerThread);

  public
    {$IFDEF ROOTISCOMPONENT}
    constructor Create(AOwner: TComponent); override;
    {$ELSE}
    constructor Create;
    {$ENDIF}
    destructor  Destroy; override;
    function  PostEvent(Event: IDnIOResponse): Boolean; override;
    function  GetEvent( Thread: TDnWorkerThread; var Event: IDnIOResponse): Boolean;

    procedure BindChannelToContext(Channel: IDnIOTrackerHolder; Context: TDnThreadContext);
    procedure UnbindChannelFromContext(Channel: IDnIOTrackerHolder; Context: TDnThreadContext);
    property QueueSize: Integer read GetQueueSize;
    property BusyCount: Integer read GetBusyCount;
  published
    property Active: Boolean read FActive write SetActive;
    property RefuseOverload: Boolean read FRefuseOverload write FRefuseOverload;
    property PreCreateThreads: Boolean read FPreCreateThreads write FPreCreateThreads;
    property MinThread: Integer read FMinThread write SetMinThread;
    property MaxThread: Integer read FMaxThread write SetMaxThread;
    property MaxQueueSize: Integer read GetMaxQueueSize write SetMaxQueueSize;
    property SyncContextCreating: Boolean read FSyncCtxCreating write FSyncCtxCreating;
    
    property OnThreadCreate: TDnThreadCreateEvent read FOnThreadCreate write FOnThreadCreate;
    property OnThreadRemove: TDnThreadRemoveEvent read FOnThreadRemove write FOnThreadRemove;
  end;

  TDnWorkerThread = class(TDnThread)
  protected
    FPool:        TDnThreadExecutor;
    FBusy:        Boolean;
    FBoundCount:  Cardinal;
    FEvent:       TInterfaceList;
    FNonEmpty:    TDnSemaphore;
    FGuard:       TDnMutex;
    
    function  GetEvent(var Event: IDnIOResponse): Boolean;
    procedure Lock;
    procedure Unlock;
    function  PostEvent(Event: IDnIOResponse): Boolean;
    procedure Bind(Source: IDnIOTrackerHolder);
    procedure Unbind(Source: IDnIOTrackerHolder);
    function  DoExit: Boolean;
    procedure ThreadRoutine; override;
    procedure CreateContext; override;
    procedure DestroyContext; override;
    
  public
    constructor Create(Pool: TDnThreadExecutor);
    destructor  Destroy; override;
    property    Busy: Boolean read FBusy;
    property    Pool: TDnThreadExecutor read FPool write FPool;
  end;
  //----------------------------------------------------------------------------

procedure Register;
implementation


constructor TDnThreadExecutor.Create{$IFDEF ROOTISCOMPONENT}(AOwner: TComponent){$ENDIF};
begin
  inherited Create{$IFDEF ROOTISCOMPONENT}(AOwner){$ENDIF};
  FWorker := TObjectList.Create(True);
  FEvent := TInterfaceList.Create;
  //FSource := TInterfaceList.Create;
  FMaxQueueSize := 1024;
  FMaxThread := 16;
  FMinThread := 0;
  FRefuseOverload := False;
  FPreCreateThreads := False;

  FOnThreadCreate := Nil;
  FOnThreadRemove := Nil;
  
  FLogger := Nil;
  FLogLevel := llMandatory;
  //FContext := Nil;
  FGuard := TDnMutex.Create;
  FNonEmpty := TDnSemaphore.Create(0, $7FFFFFFF);
  FSyncCtxCreating := True;
  FStopping := False;
end;

destructor TDnThreadExecutor.Destroy;
begin
  FreeAndNil(FWorker);
  FreeAndNil(FEvent);
  FreeAndNil(FGuard);
  FreeAndNil(FNonEmpty);
  inherited Destroy;
end;

procedure TDnThreadExecutor.DoLogEvent(Msg: String);
begin
  try
    if Assigned(FLogger) then
      FLogger.LogMsg(FLogLevel, Msg);
  except
    ; //suppress any exceptions. Server SHOULD works!
  end;
end;

procedure TDnThreadExecutor.IncrementBusy;
begin
  InterLockedIncrement(FBusyCount);
end;

procedure TDnThreadExecutor.DecrementBusy;
begin
  InterLockedDecrement(FBusyCount);
end;

procedure TDnThreadExecutor.SetMaxQueueSize(Value: Integer);
begin
  FMaxQueueSize := Value;
end;

function  TDnThreadExecutor.GetMaxQueueSize: Integer;
begin
  Result := FMaxQueueSize;
end;

function TDnThreadExecutor.LaunchThread: TDnWorkerThread;
var AllowCreate: Boolean;
    Context: TDnThreadContext;
begin
  if @FOnThreadCreate <> Nil  then
    AllowCreate := FOnThreadCreate(Self)
  else
    AllowCreate := True;

  if AllowCreate then
  begin
    Result := TDnWorkerThread.Create(Self);
    Context := Nil;
    if Result <> Nil then
    begin
      if Assigned(FOnCreateContext) and not FSyncCtxCreating then
        Context := FOnCreateContext(Result)
      else
      if not FSyncCtxCreating then
        Context := TDnThreadContext.Create(Result);
      Result.FContext := Context;
            
      FWorker.Add(Result);
      Result.Run;
    end;
  end else
  begin
    Self.DoLogEvent(SCannotCreateThread);
    Result := Nil;
  end;
end;

procedure TDnThreadExecutor.RemoveThread(Thread: TDnWorkerThread);
begin
  try
    try
      FGuard.Acquire;
      Thread.FreeOnTerminate := True;
      FWorker.Extract(Thread);
      Thread.Terminate;
      //handlers for OnDestroyRuntimeContext and OnThreadFinish should be called from thread
    finally
      FGuard.Release;
    end;
  except
    DoLogEvent(SCannotRemoveThread);
  end;
end;


procedure TDnThreadExecutor.SetActive(Value: Boolean);
var
  i: Integer;
begin
  if not FActive and Value then
  begin //turn on
    FGuard.Acquire;
    FStopping := False;
    if FPreCreateThreads then
    begin
      for i:=0 to FMinThread - 1 do
        LaunchThread;
    end;
    FActive := True;
    FGuard.Release;
  end
  else if FActive and not Value then
  begin //stop
    FGuard.Acquire;
    FStopping := True;
    for i:=0 to FWorker.Count-1 do
      if TDnWorkerThread(FWorker[i]).FBoundCount > 0 then
        TDnWorkerThread(FWorker[i]).FNonEmpty.Release; //signal about 'NULL' event
    FNonEmpty.Release; //signal about 'NULL' event

    for i:=0 to FWorker.Count-1 do
      TDnWorkerThread(FWorker[i]).Terminate;

    FNonEmpty.Release;
    //signal to terminate
    while FWorker.Count <> 0 do
    begin
      FGuard.Release;
      Sleep(10);
      FGuard.Acquire;
    end;
    //Clear all incoming events - do not handle&destroy them
    FEvent.Clear;
    FWorker.Clear; //this statement should stop&destroy all workers
    FActive := False;
    FStopping := False;
    FGuard.Release;
  end;
end;

function TDnThreadExecutor.PostEvent(Event: IDnIOResponse): Boolean;
var Tracker: IDnIOTrackerHolder;
    CommonQueue: Boolean;
begin
  Result := False;
  //check the binding
  CommonQueue := True;
  Tracker := Event.Channel as IDnIOTrackerHolder;
  if Tracker <> Nil then
  begin
    if Tracker.Tracker <> Nil then
    begin
      TDnWorkerThread(Tracker.Tracker).PostEvent(Event);
      CommonQueue := False;
    end;
    Tracker := Nil;
  end;

  if CommonQueue then

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -