⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 uthreadpool.pas

📁 DELPHI 线程池代码(uThreadPool.pas)
💻 PAS
📖 第 1 页 / 共 2 页
字号:
  if Assigned(FThreadInitializing) then
    FThreadInitializing(Self, aThread);
end; { TThreadsPool.DoThreadInitializing }

{
函 数 名:TThreadsPool.FreeFinishedThreads
功能描述:释放 FThreadsKilling 列表中的线程
输入参数:无
返 回 值: 无
创建日期:2006.10.22 11:34
修改日期:2006.
作    者:Kook
附加说明:
}

procedure TThreadsPool.FreeFinishedThreads;
var
  i: Integer;
begin
  if csThreadManagment.TryEnter then
  try
    for i := FThreadsKilling.Count - 1 downto 0 do
      if TProcessorThread(FThreadsKilling[i]).isFinished then
      begin
        TProcessorThread(FThreadsKilling[i]).Free;
        FThreadsKilling.Delete(i);
      end;
  finally
    csThreadManagment.Leave
  end;
end; { TThreadsPool.FreeFinishedThreads }

{
函 数 名:TThreadsPool.GetRequest
功能描述:申请任务
输入参数:out Request: TRequestDataObject
返 回 值: 无
创建日期:2006.10.22 11:34
修改日期:2006.
作    者:Kook
附加说明:
}

procedure TThreadsPool.GetRequest(out Request: TWorkItem);
begin
{$IFNDEF NOLOGS}
  WriteLog('申请任务', 2);
{$ENDIF}
  csQueueManagment.Enter;
  try
    //跳过空的队列元素
    while (FLastGetPoint < FQueue.Count) and (FQueue[FLastGetPoint] = nil) do
      Inc(FLastGetPoint);

    Assert(FLastGetPoint < FQueue.Count);
    //压缩队列,清除空元素
    if (FQueue.Count > 127) and (FLastGetPoint >= (3 * FQueue.Count) div 4) then
    begin
{$IFNDEF NOLOGS}
      WriteLog('FQueue.Pack', 1);
{$ENDIF}
      FQueue.Pack;
      FLastGetPoint := 0;
    end;

    Request := TWorkItem(FQueue[FLastGetPoint]);
    FQueue[FLastGetPoint] := nil;
    inc(FLastGetPoint);
    if (FLastGetPoint = FQueue.Count) then //如果队列中无任务
    begin

      DoQueueEmpty(ekQueueEmpty);
      FQueue.Clear;
      FLastGetPoint := 0;
    end;
  finally
    csQueueManagment.Leave;
  end;
end; { TThreadsPool.GetRequest }

function TThreadsPool.InfoText: string;
begin
  Result := '';
  //end;
  //{$ELSE}
  //var
  //  i: Integer;
  //begin
  //  csQueueManagment.Enter;
  //  csThreadManagment.Enter;
  //  try
  //    if (FThreads.Count = 0) and (FThreadsKilling.Count = 1) and
  //      TProcessorThread(FThreadsKilling[0]).isFinished then
  //      FreeFinishedThreads;
  //
  //    Result := Format(
  //      'Pool thread: Min=%d, Max=%d, WorkingThreadsCount=%d, TerminatedThreadCount=%d, QueueLength=%d'#13#10,
  //      [ThreadsMin, ThreadsMax, FThreads.Count, FThreadsKilling.Count,
  //      FQueue.Count]
  //        );
  //    if FThreads.Count > 0 then
  //      Result := Result + 'Working threads:'#13#10;
  //    for i := 0 to FThreads.Count - 1 do
  //      Result := Result + TProcessorThread(FThreads[i]).InfoText + #13#10;
  //    if FThreadsKilling.Count > 0 then
  //      Result := Result + 'Terminated threads:'#13#10;
  //    for i := 0 to FThreadsKilling.Count - 1 do
  //      Result := Result + TProcessorThread(FThreadsKilling[i]).InfoText + #13#10;
  //  finally
  //    csThreadManagment.Leave;
  //    csQueueManagment.Leave;
  //  end;
  //end;
  //{$ENDIF}
end; { TThreadsPool.InfoText }

{
函 数 名:TThreadsPool.KillDeadThreads
功能描述:清除死线程
输入参数:无
返 回 值: 无
创建日期:2006.10.22 11:32
修改日期:2006.
作    者:Kook
附加说明:
}

procedure TThreadsPool.KillDeadThreads;
var
  i: Integer;
begin
  // Check for dead threads
  if csThreadManagment.TryEnter then
  try
    for i := 0 to FThreads.Count - 1 do
      if TProcessorThread(FThreads[i]).IsDead then
      begin
        // Dead thread moverd to other list.
        // New thread created to replace dead one
        TProcessorThread(FThreads[i]).Terminate;
        FThreadsKilling.Add(FThreads[i]);
        try
          FThreads[i] := TProcessorThread.Create(Self);
        except
          on e: Exception do
          begin
            FThreads[i] := nil;
{$IFNDEF NOLOGS}
            WriteLog(
              'TProcessorThread.Create raise: ' + e.ClassName +
              #13#10#9'Message: ' + e.Message,
              9
              );
{$ENDIF}
          end;
        end;
      end;
  finally
    csThreadManagment.Leave
  end;
end; { TThreadsPool.KillDeadThreads }

function TThreadsPool.PoolAverageWaitingTime: Integer;
var
  i: Integer;
begin
  Result := 0;
  if FThreads.Count > 0 then
  begin
    for i := 0 to FThreads.Count - 1 do
      Inc(result, TProcessorThread(FThreads[i]).AverageWaitingTime);
    Result := Result div FThreads.Count
  end
  else
    Result := 1;
end; { TThreadsPool.PoolAverageWaitingTime }

procedure TThreadsPool.WriteLog(const Str: string; Level: Integer = 0);
begin
{$IFNDEF NOLOGS}
  uThreadPool.WriteLog(Str, 0, Level);
{$ENDIF}
end; { TThreadsPool.WriteLog }

// 工作线程仅用于线程池内, 不要直接创建并调用它。
{
******************************* TProcessorThread *******************************
}

constructor TProcessorThread.Create(APool: TThreadsPool);
begin
  WriteLog('创建工作线程', 5);
  inherited Create(True);
  FPool := aPool;

  FAverageWaitingTime := 1000;
  FAverageProcessing := 3000;

  sInitError := '';
  {
  各参数的意义如下:
   
   参数一:填上 nil 即可。
   参数二:是否采用手动调整灯号。
   参数三:灯号的起始状态,False 表示红灯。
   参数四:Event 名称, 对象名称相同的话,会指向同一个对象,所以想要有两个Event对象,便要有两个不同的名称(这名称以字符串来存.为NIL的话系统每次会自己创建一个不同的名字,就是被次创建的都是新的EVENT)。
   传回值:Event handle。
  }
  hInitFinished := CreateEvent(nil, True, False, nil);
  hThreadTerminated := CreateEvent(nil, True, False, nil);
  csProcessingDataObject := TCriticalSection.Create;
  try
    WriteLog('TProcessorThread.Create::Resume', 3);
    Resume;
    //阻塞, 等待初始化完成
    WaitForSingleObject(hInitFinished, INFINITE);
    if sInitError <> '' then
      raise Exception.Create(sInitError);
  finally
    CloseHandle(hInitFinished);
  end;
  WriteLog('TProcessorThread.Create::Finished', 3);
end; { TProcessorThread.Create }

destructor TProcessorThread.Destroy;
begin
  WriteLog('工作线程销毁', 5);
  CloseHandle(hThreadTerminated);
  csProcessingDataObject.Free;
  inherited;
end; { TProcessorThread.Destroy }

function TProcessorThread.AverageProcessingTime: DWORD;
begin
  if (FCurState in [tcsProcessing]) then
    Result := NewAverage(FAverageProcessing, GetTickCount - uProcessingStart)
  else
    Result := FAverageProcessing
end; { TProcessorThread.AverageProcessingTime }

function TProcessorThread.AverageWaitingTime: DWORD;
begin
  if (FCurState in [tcsWaiting, tcsCheckingDown]) then
    Result := NewAverage(FAverageWaitingTime, GetTickCount - uWaitingStart)
  else
    Result := FAverageWaitingTime
end; { TProcessorThread.AverageWaitingTime }

procedure TProcessorThread.Execute;

type
  THandleID = (hidTerminateThread, hidRequest, hidCheckPoolDown);
var
  WaitedTime: Integer;
  Handles: array[THandleID] of THandle;

begin
  WriteLog('工作线程进常运行', 3);
  //当前状态:初始化
  FCurState := tcsInitializing;
  try
    //执行外部事件
    FPool.DoThreadInitializing(Self);
  except
    on e: Exception do
      sInitError := e.Message;
  end;

  //初始化完成,初始化Event绿灯
  SetEvent(hInitFinished);

  WriteLog('TProcessorThread.Execute::Initialized', 3);

  //引用线程池的同步 Event
  Handles[hidTerminateThread] := hThreadTerminated;
  Handles[hidRequest] := FPool.hSemRequestCount;
  Handles[hidCheckPoolDown] := FPool.hTimCheckPoolDown;

  //时间戳,
  //todo: 好像在线程中用 GetTickCount; 会不正常
  uWaitingStart := GetTickCount;
  //任务置空
  FProcessingDataObject := nil;

  //大巡环
  while not terminated do
  begin
    //当前状态:等待
    FCurState := tcsWaiting;
    //阻塞线程,使线程休眠
    case WaitForMultipleObjects(Length(Handles), @Handles, False, INFINITE) -
      WAIT_OBJECT_0 of

      WAIT_OBJECT_0 + ord(hidTerminateThread):
        begin
          WriteLog('TProcessorThread.Execute:: Terminate event signaled ', 5);
          //当前状态:正在终止线程
          FCurState := tcsTerminating;
          //退出大巡环(结束线程)
          Break;
        end;

      WAIT_OBJECT_0 + ord(hidRequest):
        begin
          WriteLog('TProcessorThread.Execute:: Request semaphore signaled ', 3);
          //等待的时间
          WaitedTime := GetTickCount - uWaitingStart;
          //重新计算平均等待时间
          FAverageWaitingTime := NewAverage(FAverageWaitingTime, WaitedTime);
          //当前状态:申请任务
          FCurState := tcsGetting;
          //如果等待时间过短,则检查工作线程是否足够
          if WaitedTime < 5 then
            FPool.CheckThreadsForGrow;
          //从线程池的任务队列中得到任务
          FPool.GetRequest(FProcessingDataObject);
          //开始处理的时间戳
          uProcessingStart := GetTickCount;
          //当前状态:执行任务
          FCurState := tcsProcessing;
          try
{$IFNDEF NOLOGS}
            WriteLog('Processing: ' + FProcessingDataObject.TextForLog, 2);
{$ENDIF}
            //执行任务
            FPool.DoProcessRequest(FProcessingDataObject, Self);
          except
            on e: Exception do
              WriteLog(
                'OnProcessRequest for ' + FProcessingDataObject.TextForLog +
                #13#10'raise Exception: ' + e.Message,
                8
                );
          end;

          //释放任务对象
          csProcessingDataObject.Enter;
          try
            FProcessingDataObject.Free;
            FProcessingDataObject := nil;
          finally
            csProcessingDataObject.Leave;
          end;
          //重新计算
          FAverageProcessing := NewAverage(FAverageProcessing, GetTickCount -
            uProcessingStart);
          //当前状态:执行任务完毕
          FCurState := tcsProcessed;
          //执行线程外事件
          FPool.DoProcessed;

          uWaitingStart := GetTickCount;
        end;
      WAIT_OBJECT_0 + ord(hidCheckPoolDown):
        begin
          // !!! Never called under Win9x
          WriteLog('TProcessorThread.Execute:: CheckPoolDown timer signaled ',
            4);
          //当前状态:线程池停机(检查并清除空闲线程和死线程)
          FCurState := tcsCheckingDown;
          FPool.CheckPoolDown;
        end;
    end;
  end;
  FCurState := tcsTerminating;

  FPool.DoThreadFinalizing(Self);
end; { TProcessorThread.Execute }

function TProcessorThread.IamCurrentlyProcess(DataObj: TWorkItem): Boolean;
begin
  csProcessingDataObject.Enter;
  try
    Result := (FProcessingDataObject <> nil) and
      DataObj.IsTheSame(FProcessingDataObject);
  finally
    csProcessingDataObject.Leave;
  end;
end; { TProcessorThread.IamCurrentlyProcess }

function TProcessorThread.InfoText: string;

const
  ThreadStateNames: array[TThreadState] of string =
  (
    'tcsInitializing',
    'tcsWaiting',
    'tcsGetting',
    'tcsProcessing',
    'tcsProcessed',
    'tcsTerminating',
    'tcsCheckingDown'
    );

begin
{$IFNDEF NOLOGS}
  Result := Format(
    '%5d: %15s, AverageWaitingTime=%6d, AverageProcessingTime=%6d',
    [ThreadID, ThreadStateNames[FCurState], AverageWaitingTime,
    AverageProcessingTime]
      );
  case FCurState of
    tcsWaiting:
      Result := Result + ', WaitingTime=' + IntToStr(GetTickCount -
        uWaitingStart);
    tcsProcessing:
      Result := Result + ', ProcessingTime=' + IntToStr(GetTickCount -
        uProcessingStart);
  end;

  csProcessingDataObject.Enter;
  try
    if FProcessingDataObject <> nil then
      Result := Result + ' ' + FProcessingDataObject.TextForLog;
  finally
    csProcessingDataObject.Leave;
  end;
{$ENDIF}
end; { TProcessorThread.InfoText }

function TProcessorThread.IsDead: Boolean;
begin
  Result :=
    Terminated or
    (FPool.ThreadDeadTimeout > 0) and (FCurState = tcsProcessing) and
    (GetTickCount - uProcessingStart > FPool.ThreadDeadTimeout);
  if Result then
    WriteLog('Thread dead', 5);
end; { TProcessorThread.IsDead }

function TProcessorThread.isFinished: Boolean;
begin
  Result := WaitForSingleObject(Handle, 0) = WAIT_OBJECT_0;
end; { TProcessorThread.isFinished }

function TProcessorThread.isIdle: Boolean;
begin
  // 如果线程状态是 tcsWaiting, tcsCheckingDown
  // 并且 空间时间 > 100ms,
  // 并且 平均等候任务时间大于平均工作时间的 50%
  // 则视为空闲。
  Result :=
    (FCurState in [tcsWaiting, tcsCheckingDown]) and
    (AverageWaitingTime > 100) and
    (AverageWaitingTime * 2 > AverageProcessingTime);
end; { TProcessorThread.isIdle }

function TProcessorThread.NewAverage(OldAvg, NewVal: Integer): Integer;
begin
  Result := (OldAvg * 2 + NewVal) div 3;
end; { TProcessorThread.NewAverage }

procedure TProcessorThread.Terminate;
begin
  WriteLog('TProcessorThread.Terminate', 5);
  inherited Terminate;
  SetEvent(hThreadTerminated);
end; { TProcessorThread.Terminate }

procedure TProcessorThread.WriteLog(const Str: string; Level: Integer = 0);
begin
{$IFNDEF NOLOGS}
  uThreadPool.WriteLog(Str, ThreadID, Level);
{$ENDIF}
end; { TProcessorThread.WriteLog }

{
******************************* TCriticalSection *******************************
}

constructor TCriticalSection.Create;
begin
  InitializeCriticalSection(FSection);
end; { TCriticalSection.Create }

destructor TCriticalSection.Destroy;
begin
  DeleteCriticalSection(FSection);
end; { TCriticalSection.Destroy }

procedure TCriticalSection.Enter;
begin
  EnterCriticalSection(FSection);
end; { TCriticalSection.Enter }

procedure TCriticalSection.Leave;
begin
  LeaveCriticalSection(FSection);
end; { TCriticalSection.Leave }

function TCriticalSection.TryEnter: Boolean;
begin
  Result := TryEnterCriticalSection(FSection);
end; { TCriticalSection.TryEnter }

procedure NoLogs(const Str: string; LogID: Integer = 0; Level: Integer = 0);
begin
end;

initialization
  WriteLog := NoLogs;
end.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -