⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 uthreadpool.pas

📁 DELPHI 线程池代码(uThreadPool.pas)
💻 PAS
📖 第 1 页 / 共 2 页
字号:
unit uThreadPool;

{   aPool.AddRequest(TMyRequest.Create(RequestParam1, RequestParam2, ...)); }

interface
uses
  Windows,
  Classes;

// 是否记录日志
// {$DEFINE NOLOGS}

type
  TCriticalSection = class(TObject)
  protected
    FSection: TRTLCriticalSection;
  public
    constructor Create;
    destructor Destroy; override;
    // 进入临界区
    procedure Enter;
    // 离开临界区
    procedure Leave;
    // 尝试进入
    function TryEnter: Boolean;
  end;

type
  // 储存请求数据的基本类
  TWorkItem = class(TObject)
  public
    // 是否有重复任务
    function IsTheSame(DataObj: TWorkItem): Boolean; virtual;
    // 如果 NOLOGS 被定义,则禁用。
    function TextForLog: string; virtual;
  end;

type
  TThreadsPool = class;

  //线程状态
  TThreadState = (tcsInitializing, tcsWaiting, tcsGetting, tcsProcessing,
    tcsProcessed, tcsTerminating, tcsCheckingDown);
  // 工作线程仅用于线程池内, 不要直接创建并调用它。
  TProcessorThread = class(TThread)
  private
    // 创建线程时临时的Event对象, 阻塞线程直到初始化完成
    hInitFinished: THandle;
    // 初始化出错信息
    sInitError: string;
    // 记录日志
    procedure WriteLog(const Str: string; Level: Integer = 0);
  protected
    // 线程临界区同步对像
    csProcessingDataObject: TCriticalSection;
    // 平均处理时间
    FAverageProcessing: Integer;
    // 等待请求的平均时间
    FAverageWaitingTime: Integer;
    // 本线程实例的运行状态
    FCurState: TThreadState;
    // 本线程实例所附属的线程池
    FPool: TThreadsPool;
    // 当前处理的数据对像。
    FProcessingDataObject: TWorkItem;
    // 线程停止 Event, TProcessorThread.Terminate 中开绿灯
    hThreadTerminated: THandle;
    uProcessingStart: DWORD;
    // 开始等待的时间, 通过 GetTickCount 取得。
    uWaitingStart: DWORD;
    // 计算平均工作时间
    function AverageProcessingTime: DWORD;
    // 计算平均等待时间
    function AverageWaitingTime: DWORD;
    procedure Execute; override;
    function IamCurrentlyProcess(DataObj: TWorkItem): Boolean;
    // 转换枚举类型的线程状态为字串类型
    function InfoText: string;
    // 线程是否长时间处理同一个请求?(已死掉?)
    function IsDead: Boolean;
    // 线程是否已完成当成任务
    function isFinished: Boolean;
    // 线程是否处于空闲状态
    function isIdle: Boolean;
    // 平均值校正计算。
    function NewAverage(OldAvg, NewVal: Integer): Integer;
  public
    Tag: Integer;
    constructor Create(APool: TThreadsPool);
    destructor Destroy; override;
    procedure Terminate;
  end;

  // 线程初始化时触发的事件
  TProcessorThreadInitializing = procedure(Sender: TThreadsPool; aThread:
    TProcessorThread) of object;
  // 线程结束时触发的事件
  TProcessorThreadFinalizing = procedure(Sender: TThreadsPool; aThread:
    TProcessorThread) of object;
  // 线程处理请求时触发的事件
  TProcessRequest = procedure(Sender: TThreadsPool; WorkItem: TWorkItem;
    aThread: TProcessorThread) of object;
  TEmptyKind = (
    ekQueueEmpty, //任务被取空后
    ekProcessingFinished // 最后一个任务处理完毕后
    );
  // 任务队列空时触发的事件
  TQueueEmpty = procedure(Sender: TThreadsPool; EmptyKind: TEmptyKind) of
    object;

  TThreadsPool = class(TComponent)
  private
    csQueueManagment: TCriticalSection;
    csThreadManagment: TCriticalSection;
    FProcessRequest: TProcessRequest;
    FQueue: TList;
    FQueueEmpty: TQueueEmpty;
    // 线程超时阀值
    FThreadDeadTimeout: DWORD;
    FThreadFinalizing: TProcessorThreadFinalizing;
    FThreadInitializing: TProcessorThreadInitializing;
    // 工作中的线程
    FThreads: TList;
    // 执行了 terminat 发送退出指令, 正在结束的线程.
    FThreadsKilling: TList;
    // 最少, 最大线程数
    FThreadsMax: Integer;
    // 最少, 最大线程数
    FThreadsMin: Integer;
    // 池平均等待时间
    function PoolAverageWaitingTime: Integer;
    procedure WriteLog(const Str: string; Level: Integer = 0);
  protected
    FLastGetPoint: Integer;
    // Semaphore, 统计任务队列
    hSemRequestCount: THandle;
    // Waitable timer. 每30触发一次的时间量同步
    hTimCheckPoolDown: THandle;
    // 线程池停机(检查并清除空闲线程和死线程)
    procedure CheckPoolDown;
    // 清除死线程,并补充不足的工作线程
    procedure CheckThreadsForGrow;
    procedure DoProcessed;
    procedure DoProcessRequest(aDataObj: TWorkItem; aThread: TProcessorThread);
      virtual;
    procedure DoQueueEmpty(EmptyKind: TEmptyKind); virtual;
    procedure DoThreadFinalizing(aThread: TProcessorThread); virtual;
    // 执行事件
    procedure DoThreadInitializing(aThread: TProcessorThread); virtual;
    // 释放 FThreadsKilling 列表中的线程
    procedure FreeFinishedThreads;
    // 申请任务
    procedure GetRequest(out Request: TWorkItem);
    // 清除死线程
    procedure KillDeadThreads;
  public
    constructor Create(AOwner: TComponent); override;
    destructor Destroy; override;
    // 就进行任务是否重复的检查, 检查发现重复就返回 False
    function AddRequest(aDataObject: TWorkItem; CheckForDoubles: Boolean =
      False): Boolean; overload;
    // 转换枚举类型的线程状态为字串类型
    function InfoText: string;
  published
    // 线程处理任务时触发的事件
    property OnProcessRequest: TProcessRequest read FProcessRequest write
      FProcessRequest;
    // 任务列表为空时解发的事件
    property OnQueueEmpty: TQueueEmpty read FQueueEmpty write FQueueEmpty;
    // 线程结束时触发的事件
    property OnThreadFinalizing: TProcessorThreadFinalizing read
      FThreadFinalizing write FThreadFinalizing;
    // 线程初始化时触发的事件
    property OnThreadInitializing: TProcessorThreadInitializing read
      FThreadInitializing write FThreadInitializing;
    // 线程超时值(毫秒), 如果处理超时,将视为死线程
    property ThreadDeadTimeout: DWORD read FThreadDeadTimeout write
      FThreadDeadTimeout default 0;
    // 最大线程数
    property ThreadsMax: Integer read FThreadsMax write FThreadsMax default 1;
    // 最小线程数
    property ThreadsMin: Integer read FThreadsMin write FThreadsMin default 0;
  end;

type
  //日志记志函数
  TLogWriteProc = procedure(
    const Str: string; //日志
    LogID: Integer = 0;
    Level: Integer = 0 //Level = 0 - 跟踪信息, 10 - 致命错误
    );

var
  WriteLog: TLogWriteProc; // 如果存在实例就写日志

implementation
uses
  SysUtils;

// 储存请求数据的基本类
{
********************************** TWorkItem ***********************************
}

function TWorkItem.IsTheSame(DataObj: TWorkItem): Boolean;
begin
  Result := False;
end; { TWorkItem.IsTheSame }

function TWorkItem.TextForLog: string;
begin
  Result := 'Request';
end; { TWorkItem.TextForLog }

{
********************************* TThreadsPool *********************************
}

constructor TThreadsPool.Create(AOwner: TComponent);
var
  DueTo: Int64;
begin
{$IFNDEF NOLOGS}
  WriteLog('创建线程池', 5);
{$ENDIF}
  inherited;
  csQueueManagment := TCriticalSection.Create;
  FQueue := TList.Create;
  csThreadManagment := TCriticalSection.Create;
  FThreads := TList.Create;
  FThreadsKilling := TList.Create;
  FThreadsMin := 0;
  FThreadsMax := 1;
  FThreadDeadTimeout := 0;
  FLastGetPoint := 0;
  //
  hSemRequestCount := CreateSemaphore(nil, 0, $7FFFFFFF, nil);

  DueTo := -1;
  //可等待的定时器(只用于Window NT4或更高)
  hTimCheckPoolDown := CreateWaitableTimer(nil, False, nil);

  if hTimCheckPoolDown = 0 then // Win9x不支持
    // In Win9x number of thread will be never decrised
    hTimCheckPoolDown := CreateEvent(nil, False, False, nil)
  else
    SetWaitableTimer(hTimCheckPoolDown, DueTo, 30000, nil, nil, False);
end; { TThreadsPool.Create }

destructor TThreadsPool.Destroy;
var
  n, i: Integer;
  Handles: array of THandle;
begin
{$IFNDEF NOLOGS}
  WriteLog('线程池销毁', 5);
{$ENDIF}
  csThreadManagment.Enter;

  SetLength(Handles, FThreads.Count);
  n := 0;
  for i := 0 to FThreads.Count - 1 do
    if FThreads[i] <> nil then
    begin
      Handles[n] := TProcessorThread(FThreads[i]).Handle;
      TProcessorThread(FThreads[i]).Terminate;
      Inc(n);
    end;

  csThreadManagment.Leave;  // lixiaoyu 添加于 2009.1.6,如没有此行代码无法成功释放正在执行中的工作者线程,死锁。

  WaitForMultipleObjects(n, @Handles[0], True, 30000);  // 等待工作者线程执行终止  lixiaoyu 注释于 2009.1.6

  csThreadManagment.Enter;  // lixiaoyu 添加于 2009.1.6 再次进入锁定,并释放资源
  for i := 0 to FThreads.Count - 1 do
    TProcessorThread(FThreads[i]).Free;
  FThreads.Free;
  FThreadsKilling.Free;
  csThreadManagment.Free;

  csQueueManagment.Enter;
  for i := FQueue.Count - 1 downto 0 do
    TObject(FQueue[i]).Free;
  FQueue.Free;
  csQueueManagment.Free;

  CloseHandle(hSemRequestCount);
  CloseHandle(hTimCheckPoolDown);
  inherited;
end; { TThreadsPool.Destroy }

function TThreadsPool.AddRequest(aDataObject: TWorkItem; CheckForDoubles:
  Boolean = False): Boolean;
var
  i: Integer;
begin
{$IFNDEF NOLOGS}
  WriteLog('AddRequest(' + aDataObject.TextForLog + ')', 2);
{$ENDIF}
  Result := False;
  csQueueManagment.Enter;
  try
    // 如果 CheckForDoubles = TRUE
    // 则进行任务是否重复的检查
    if CheckForDoubles then
      for i := 0 to FQueue.Count - 1 do
        if (FQueue[i] <> nil)
          and aDataObject.IsTheSame(TWorkItem(FQueue[i])) then
          Exit; // 发现有相同的任务

    csThreadManagment.Enter;
    try
      // 清除死线程,并补充不足的工作线程
      CheckThreadsForGrow;

      // 如果 CheckForDoubles = TRUE
      // 则检查是否有相同的任务正在处理中
      if CheckForDoubles then
        for i := 0 to FThreads.Count - 1 do
          if TProcessorThread(FThreads[i]).IamCurrentlyProcess(aDataObject) then
            Exit; // 发现有相同的任务

    finally
      csThreadManagment.Leave;
    end;

    //将任务加入队列
    FQueue.Add(aDataObject);

    //释放一个同步信号量
    ReleaseSemaphore(hSemRequestCount, 1, nil);
{$IFNDEF NOLOGS}
    WriteLog('释放一个同步信号量)', 1);
{$ENDIF}
    Result := True;
  finally
    csQueueManagment.Leave;
  end;
{$IFNDEF NOLOGS}
  //调试信息
  WriteLog('增加一个任务(' + aDataObject.TextForLog + ')', 1);
{$ENDIF}
end; { TThreadsPool.AddRequest }

{
函 数 名:TThreadsPool.CheckPoolDown
功能描述:线程池停机(检查并清除空闲线程和死线程)
输入参数:无
返 回 值: 无
创建日期:2006.10.22 11:31
修改日期:2006.
作    者:Kook
附加说明:
}

procedure TThreadsPool.CheckPoolDown;
var
  i: Integer;
begin
{$IFNDEF NOLOGS}
  WriteLog('TThreadsPool.CheckPoolDown', 1);
{$ENDIF}
  csThreadManagment.Enter;
  try
{$IFNDEF NOLOGS}
    WriteLog(InfoText, 2);
{$ENDIF}
    // 清除死线程
    KillDeadThreads;
    // 释放 FThreadsKilling 列表中的线程
    FreeFinishedThreads;

    // 如果线程空闲,就终止它
    for i := FThreads.Count - 1 downto FThreadsMin do
      if TProcessorThread(FThreads[i]).isIdle then
      begin
        //发出终止命令
        TProcessorThread(FThreads[i]).Terminate;
        //加入待清除队列
        FThreadsKilling.Add(FThreads[i]);
        //从工作队列中除名
        FThreads.Delete(i);
        //todo: ??
        Break;
      end;
  finally
    csThreadManagment.Leave;
  end;
end; { TThreadsPool.CheckPoolDown }

{
函 数 名:TThreadsPool.CheckThreadsForGrow
功能描述:清除死线程,并补充不足的工作线程
输入参数:无
返 回 值: 无
创建日期:2006.10.22 11:31
修改日期:2006.
作    者:Kook
附加说明:
}

procedure TThreadsPool.CheckThreadsForGrow;
var
  AvgWait: Integer;
  i: Integer;
begin
  {
    New thread created if:
    新建线程的条件:
      1. 工作线程数小于最小线程数
      2. 工作线程数小于最大线程数 and 线程池平均等待时间 < 100ms(系统忙)
      3. 任务大于工作线程数的4倍
  }

  csThreadManagment.Enter;
  try
    KillDeadThreads;
    if FThreads.Count < FThreadsMin then
    begin
{$IFNDEF NOLOGS}
      WriteLog('工作线程数小于最小线程数', 4);
{$ENDIF}
      for i := FThreads.Count to FThreadsMin - 1 do
      try
        FThreads.Add(TProcessorThread.Create(Self));
      except
        on e: Exception do

          WriteLog(
            'TProcessorThread.Create raise: ' + e.ClassName + #13#10#9'Message: '
            + e.Message,
            9
            );
      end
    end
    else if FThreads.Count < FThreadsMax then
    begin
{$IFNDEF NOLOGS}
      WriteLog('工作线程数小于最大线程数 and 线程池平均等待时间 < 100ms', 3);
{$ENDIF}
      AvgWait := PoolAverageWaitingTime;
{$IFNDEF NOLOGS}
      WriteLog(Format(
        'FThreads.Count (%d)<FThreadsMax(%d), AvgWait=%d',
        [FThreads.Count, FThreadsMax, AvgWait]),
        4
        );
{$ENDIF}

      if AvgWait < 100 then
      try
        FThreads.Add(TProcessorThread.Create(Self));
      except
        on e: Exception do
          WriteLog(
            'TProcessorThread.Create raise: ' + e.ClassName +
            #13#10#9'Message: ' + e.Message,
            9
            );
      end;
    end;
  finally
    csThreadManagment.Leave;
  end;
end; { TThreadsPool.CheckThreadsForGrow }

procedure TThreadsPool.DoProcessed;
var
  i: Integer;
begin
  if (FLastGetPoint < FQueue.Count) then
    Exit;
  csThreadManagment.Enter;
  try
    for i := 0 to FThreads.Count - 1 do
      if TProcessorThread(FThreads[i]).FCurState in [tcsProcessing] then
        Exit;
  finally
    csThreadManagment.Leave;
  end;
  DoQueueEmpty(ekProcessingFinished);
end; { TThreadsPool.DoProcessed }

procedure TThreadsPool.DoProcessRequest(aDataObj: TWorkItem; aThread:
  TProcessorThread);
begin
  if Assigned(FProcessRequest) then
    FProcessRequest(Self, aDataObj, aThread);
end; { TThreadsPool.DoProcessRequest }

procedure TThreadsPool.DoQueueEmpty(EmptyKind: TEmptyKind);
begin
  if Assigned(FQueueEmpty) then
    FQueueEmpty(Self, EmptyKind);
end; { TThreadsPool.DoQueueEmpty }

procedure TThreadsPool.DoThreadFinalizing(aThread: TProcessorThread);
begin
  if Assigned(FThreadFinalizing) then
    FThreadFinalizing(Self, aThread);
end; { TThreadsPool.DoThreadFinalizing }

procedure TThreadsPool.DoThreadInitializing(aThread: TProcessorThread);
begin

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -