⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rtcthrpool.pas

📁 Delphi快速开发Web Server
💻 PAS
📖 第 1 页 / 共 2 页
字号:
{
  @html(<b>)
  Thread Pool
  @html(</b>)
  - Copyright (c) Danijel Tkalcec
  @html(<br><br>)

  Thread pooling mechanism used by all RTC connection components
  when component's @Link(TRtcConnection.MultiThreaded) property is set to True.
  @html(<br><br>)

  Unless you want to enhance the conncetion components or add your
  own connection providers, you will NEVER get in direct contact
  with this classes. They are being used internaly by most
  Connection Provider components to enable MultiThreaded execution.
  @html(<br><br>)

  The only thing you could get in contact with as a component user
  are the global Threading parameters @Link(RTC_THREAD_POOL_PLUS),
  @Link(RTC_THREAD_POOL_OVERSIZE) and @Link(RTC_THREAD_POOL_MAX).
  @html(<br><br>)

  Or, in case you need to post jobs to a connection component
  to enhance its functionality, with the @Link(TRtcJob) class.
}
unit rtcThrPool;

{$INCLUDE rtcDefs.inc}

interface

uses
  rtcTrashcan,

  Windows,
  // Messages,

  memXList,
  memBinList,

  rtcInfo,

  rtcLog,
  rtcSyncObjs,
  rtcHWndPool,

  SysUtils,
  Classes;

var
  // Min. number of unused threads to keep active
  RTC_THREAD_POOL_PLUS:word=3;
  // Max. number of unused threads to keep active
  RTC_THREAD_POOL_OVERSIZE:word=2048;
  // Max. number of threads in our thread pool.
  RTC_THREAD_POOL_MAX:word=64;

  // Log unhandled thread exceptions?
  LOG_THREAD_EXCEPTIONS:boolean=False;

type
  { @Abstract(Exception to be raised when System Thread limit was reached and not a single thread could be created)
    @exclude }
  EThreadLimitReached = class(Exception);

  // Event for Synchronized calls
  TRtcSyncEvent = procedure of object;

  TRtcWorkerThread = class;
  TRtcThread = class;

  { @Abstract(RTC Job class)

    To be able to post jobs to a threaded connection component,
    you can derive your own classes from @Link(TRtcJob). By implementing
    the methods @Link(TRtcJob.Run) and @Link(TRtcObject.Kill), you can post
    any job with your user-defined data to the connection component's thread. }
  TRtcJob = class(TRtcObject)

    { This method will be called ONCE to run (execute) the job.
      It is the Run() method's responsibility to release the object
      when it has finished with its execution.

      If you post jobs to connection components,
      handle your expected exceptions properly.

      Exceptions caught by the Threading mechanism will
      not be passed any further. If exception gets raised and it
      returns to the Threading mechanism, the corresponding Thread
      object will be closed, all jobs will be Killed and th Thread
      will be released from memory, which will result in
      abortive disconnect. }
    procedure Run(Thr:TRtcThread); virtual; abstract;
    end;

  { @Abstract(Thread start/stop callback class) }
  TRtcThreadCallback = class
    { Called from inside each Thread, after it was started/created }
    procedure AfterThreadStart; virtual; abstract;
    { Called from inside each Thread, before it will be stopped/destroyed }
    procedure BeforeThreadStop; virtual; abstract;
    { Callled after all threads have been stopped.
      This is the method from which you should destroy the object by calling "Free" }
    procedure DestroyCallback; virtual; abstract;
    end;

  { @Abstract(Our threading class)

    We create threads ONLY using this class.
    This class implements all methods needed for synchronizing with the GUI,
    posting jobs, pausing, resuming and stopping the thread. }
  TRtcThread = class
  private
    MsgList:TXList;
    FInfo:TRtcInfo;

    FThr:TRtcWorkerThread;
    Paused:boolean;
    Quitting:boolean;

    class function GetJob(me:TObject): TObject;
    procedure Idle;

    procedure Run(Thr:TRtcWorkerThread);

  protected

    { Called by the Worker Thread to execute a job.
      For user-defined jobs (the ones not derived from TRtcJob),
      you need to override this method and call the inherited Work(Job).
      Resutn TRUE if Thread was freed. }
    function Work(Job:TObject):boolean; virtual;

    { Called by the Worket Thread to kill a job.
      For user-defined jobs(the ones not derived from TRtcJob),
      you need to override this method and call the inherited Kill(Job). }
    procedure Kill(Job:TObject); virtual;

  public
    // Create a Thread. To give the thread something to do, you will have to post a job to it.
    constructor Create; virtual;
    { @exclude }
    destructor Destroy; override;

    // Call the 'Event' synchronized (for GUI access). May only be used from within the thread.
    procedure Sync(Event:TRtcSyncEvent);

    // Pause thread execution (thread-safe call)
    procedure Pause;
    // Resume thread execution (thread-safe call)
    procedure Resume;

    // Lock threads
    class function Lock(me:TObject):boolean;

    // Unlock threads
    class procedure UnLock;

    // add new job for the thread (thread-safe call)
    class function PostJob(me:TObject; Job:TObject; HighPriority:boolean=False; AutoResume:boolean=False):boolean;

    // Stop the thread (thread-safe call: will post a QUIT message to thread and destroy it from inside the thread)
    class procedure Stop(me:TObject);

    // Get Thread ID
    function ThreadID:Cardinal;

    // Attach additional information to this Thread. May only be used from within the thread.
    property Info:TRtcInfo read FInfo;
    end;

  { Internal Class -> DO NOT CREATE!
    @exclude }
  TRtcWorkerThread = class(TThread)
  private
    Msg: boolean;
    Work: TRtcThread;

  protected
    Run:TRtcEvent;
    FEvent:TRtcSyncEvent;
    FHWND:HWND;

    procedure Execute; override;
    procedure Sync(Event:TRtcSyncEvent);

  public
    procedure MySyncEvent;

    procedure PostQuit;
    procedure PostWork(Thr:TRtcThread);

    constructor Create(CreateSuspended:boolean);
    destructor Destroy; override;
    end;

var
  { @exclude }
{$IFDEF CLR}
  MainThr:System.Threading.Thread;
{$ELSE}
  MainThrID:DWORD;
{$ENDIF}

{ Add a new Thread Callback.

  Please note that you can NOT remove a callback and that you need
  to add all callbacks before a first thread was created, which is best
  done from your units "initialization" section. To avoid memory leaks on
  application close, you should also implement the "DestroyCallback" method. }
procedure AddThreadCallback(const Callback:TRtcThreadCallback);

implementation

type
  TRtcQuitMessage=class
    end;

var
  ThreadPool:TBinList; // all running threads (sorted for searching)
  FreePool:TXList; // threads not in use (not sorted -> add/remove last)

  ThrList:tBinList; // list of all thread objects (sorted for fast searching)
  WaitList:tBinList; // list of all thread objects waiting for execution
  WorkList:tBinList; // list of all thread objects currently executing

  Message_Quit:TRtcQuitMessage;

  Thr_LastExec:integer; // value of the last thread object that has gone into execution

  CSThread:TRtcCritSec;

  InsideCallback:integer=0;
  ThreadCallbacks:array of TRtcThreadCallback;
  ThreadCallbackCount:integer=0;

  OpenCnt:integer;
  CSOpen:TRtcEvent;

{ Add a new Thread Callback }
procedure AddThreadCallback(const Callback:TRtcThreadCallback);
  begin
  CSThread.Enter;
  try
    Inc(ThreadCallbackCount);
    SetLength(ThreadCallbacks, ThreadCallbackCount);
    ThreadCallbacks[ThreadCallbackCount-1]:=Callback;
  finally
    CSThread.Leave;
    end;
  end;

{ Remove all Thread Callbacks }
procedure RemoveThreadCallbacks;
  var
    a:integer;
  begin
  CSThread.Enter;
  try
    for a:=0 to ThreadCallbackCount-1 do
      begin
      try
        ThreadCallbacks[a].DestroyCallback;
      except
        on E:Exception do
          if LOG_THREAD_EXCEPTIONS then
            Log('TRtcThreadCallback.DestroyCallback',E);
        end;
      ThreadCallbacks[a]:=nil;
      end;
    SetLength(ThreadCallbacks,0);
    ThreadCallbackCount:=0;
  finally
    CSThread.Leave;
    end;
  end;

procedure DoAfterThreadStart;
  var
    i:integer;
  begin
  CSThread.Enter;
  try
    Inc(InsideCallback);
    for i:=0 to ThreadCallbackCount-1 do
      try
        ThreadCallbacks[i].AfterThreadStart;
      except
        on E:Exception do
          if LOG_THREAD_EXCEPTIONS then
            Log('TRtcThreadCallback.AfterThreadStart',E);
        end;
  finally
    CSThread.Leave;
    end;
  end;

procedure DoBeforeThreadStop;
  var
    i:integer;
  begin
  CSThread.Enter;
  try
    for i:=ThreadCallbackCount-1 downto 0 do
      try
        ThreadCallbacks[i].BeforeThreadStop;
      except
        on E:Exception do
          if LOG_THREAD_EXCEPTIONS then
            Log('TRtcThreadCallback.BeforeThreadStop',E);
        end;
    Dec(InsideCallback);
    if InsideCallback=0 then
      RemoveThreadCallbacks;
  finally
    CSThread.Leave;
    end;
  end;

{ Work pool }

function GetWork:TRtcThread; // get next waiting object (remove it from waiting list, add it to working list)
  var
    i,wrk:cardinal;
  begin
  Result:=nil;
  if not assigned(ThrList) then Exit;

  if WaitList.Count>0 then
    begin
    if Thr_LastExec>0 then
      begin
      wrk:=WaitList.search_g(Thr_LastExec,i);
      if wrk<=0 then
        wrk:=WaitList.search_min(i);
      end
    else
      wrk:=WaitList.search_min(i);
    Thr_LastExec:=wrk;

    WaitList.remove(wrk); // remove from waiting list
    WorkList.insert(wrk,1); // add to working list

    Result:=TRtcThread(wrk);
    end;
  end;

procedure DoneWork(Thr:TObject); // remove object from working list
  begin
  if not assigned(ThrList) then Exit;

  if (WorkList.search(longword(Thr))>0) then
    WorkList.remove(longword(Thr));
  end;

function PutWork(Thr:TObject):boolean; // put object in waiting list
  begin
  if not assigned(ThrList) then
    Result:=False
  else if (WorkList.search(longword(Thr))=0) and // not working and
          (WaitList.search(longword(Thr))=0) then // not waiting
    begin
    WaitList.insert(longword(Thr),1); // add to waiting list
    Result:=True;
    end
  else
    Result:=False;
  end;

{ Thread Pool }

function GetThread:TRtcWorkerThread;
  var
    NWork:TRtcWorkerThread;
  begin
  Result:=nil;

  if not assigned(ThreadPool) then // Create pool if not open
    begin
    ThreadPool:=tBinList.Create(128);
    FreePool:=TXList.Create(128);
    end;

  if FreePool.Count>0 then // threads available
    begin
    Result:=TRtcWorkerThread(FreePool.Last);
    FreePool.removeLast; // remove from free threads list
    end
  else if ThreadPool.Count<RTC_THREAD_POOL_MAX then // thread limit not reached
    begin
    try
      Result:=TRtcWorkerThread.Create(False);
    except
      on E:Exception do
        begin
        if LOG_THREAD_EXCEPTIONS then
          Log('WorkerThread.Create',E);
        if ThreadPool.Count=0 then
          raise EThreadLimitReached.Create(E.ClassName+':'+E.Message)
        else
          Result:=nil;
        end;
      end;

    ThreadPool.insert(longword(Result),1);
    end;

  if assigned(Result) then
    begin
    NWork:=nil;
    while (ThreadPool.Count<RTC_THREAD_POOL_MAX) and // thread limit not reached
          (FreePool.Count<RTC_THREAD_POOL_PLUS) do // free thread count under our 'minimum'
      begin
      try
        NWork:=TRtcWorkerThread.Create(False);
      except
        on E:Exception do
          begin
          if LOG_THREAD_EXCEPTIONS then
            Log('WorkerThread.Create',E);
          Break;
          end;
        end;
      ThreadPool.insert(longword(NWork),1);
      FreePool.addLast(longword(NWork));
      end;
    end;
  end;

function ReturnThread(Thr:TRtcWorkerThread):boolean; // executed 1 object, returning for another
  var
    Work:TRtcThread;
  begin
  Result:=False;

  CSThread.Enter;
  try
    if assigned(ThreadPool) then
      begin
      if ThreadPool.search(longword(Thr))>0 then
        begin
        Work:=GetWork;
        if Work<>nil then // execution object waiting
          begin
          Thr.PostWork(Work);
          Result:=True;
          end
        else if FreePool.Count<RTC_THREAD_POOL_OVERSIZE then
          begin
          FreePool.AddLast(longword(Thr));
          Result:=True;
          end
        else
          ThreadPool.remove(longword(Thr));
        end;
      end;
  finally
    CSThread.Leave;
    end;
  end;

procedure ClosingThread(Thr:TRtcWorkerThread);
  begin
  Garbage(Thr);

  CSThread.Enter;
  try
    if assigned(ThreadPool) then
      if ThreadPool.search(longword(Thr))>0 then
        ThreadPool.remove(longword(Thr));
    Dec(OpenCnt);
    if OpenCnt=0 then CSOpen.SetEvent;
  finally
    CSThread.Leave;
    end;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -