⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udithreadpool.pas

📁 楠楠写的DBiocp例子都是源码
💻 PAS
字号:
unit uDIThreadPool;

interface

uses
  Windows, SysUtils, Classes, WinSock2, uCriticalSection, uDIProtocol, uDIBuffer,
  uDIClientContext, uDISocketServer, uDIThread;
  {$I IOCP.inc}

const
  ERROR_INVALID_PARAMETER = 87;
  ERROR_NETNAME_DELETED   = 64;

type

  //线程结构
  PThreadParam = ^ThreadParam;
  ThreadParam = packed record
    m_bRun:         Boolean;                    //是否运行
    nThread:        DWORD;                      //索引
    FSocketServer:  TDISocketServer;            //指向IOCPServer的指针
    FDIThread:      TDIThread;                  //指向线程
    hEvent:         Thandle;                    //事件
  end;

  TDIWorkerThread = class(TDIThread)
  protected
    FThreadID: Integer;
    FSocketServer: TDISocketServer;
    procedure ThreadRoutine; override;
  public
    constructor Create(SocketServer: TDISocketServer; m_nThreadID: Integer);
    destructor Destroy; override;
  end;

  TDIThreadPool = class
  private
    m_bStarted:         Boolean;                //是否启动
    m_ThreadsList:      TList;
    FSocketServer:      TDISocketServer;
    FThreadPoolLock:    TCriticalSection;
  private
    m_nIOMinWorkThreads: Integer;
    m_nIOMaxWorkThreads: Integer;
    m_nIOWorkerThreads:  Integer;
    procedure SetMinWorkThreads(iMinWorkThreads: Integer);
    procedure SetMaxWorkThreads(iMaxWorkThreads: Integer);
    procedure SetIOWorkersThread(iWorkerThreads: Integer);
  published
    property FStarted: Boolean read m_bStarted;
    property FMinWorkThreads: Integer read m_nIOMinWorkThreads write SetMinWorkThreads;
    property FMaxWorkThreads: Integer read m_nIOMaxWorkThreads write SetMaxWorkThreads;
    property FIOWorkThreads: Integer read m_nIOWorkerThreads write SetIOWorkersThread;
  public
    function StartThreadPool: Boolean;
		procedure StopThreadPool;
  public
    constructor Create(SocketServer: TDISocketServer);
    destructor Destroy; override;
  end;

implementation
  uses uDIMonitor, uFileLogger;

//TDIWorkerThread
constructor TDIWorkerThread.Create(SocketServer: TDISocketServer; m_nThreadID: Integer);
begin
  inherited Create;
  FThreadID := m_nThreadID;
  FSocketServer := SocketServer;
end;

destructor TDIWorkerThread.Destroy;
begin
  inherited Destroy;
end;

procedure TDIWorkerThread.ThreadRoutine;
var
  dwIOSize: DWORD;
  bIORet: LongBool;
  dwIOError: DWORD;
  FBuffer: TDIBuffer;
  pHandleData: PPerHandleData;
  FClientContext: TDIClientContext;
  FIOEventType: IOEventType;
begin
  while (TRUE) do
  begin
    pHandleData := nil;
    FClientContext := nil;
    FBuffer := nil;
    bIORet := FSocketServer.FIOCompletionPort.GetIOCompletionStatus( FClientContext,
                                                                     pHandleData,
                                                                     dwIOSize );
    if (dwIOSize = $FFFFFFFF) then begin

      {$IFDEF _ICOP_DEBUG}
          FSocketServer.AppendLogMessage('收到PostIOCompletionStatus FFFFFFFF消息, Work线程退出');
      {$ENDIF}
      Exit;
    end;

    if (pHandleData<> nil) and
       (pHandleData.pDIBuffer<> nil) then  begin
      FIOEventType := IOEventType(pHandleData.m_overlap.Offset);
      FBuffer := TDIBuffer(pHandleData.pDIBuffer);
    end
    else
    begin

      {$IFDEF _ICOP_DEBUGERR}
          FSocketServer.AppendErrorLogMessage('FBuffer为空, IOCP异常, Work线程继续');
      {$ENDIF}
      Continue;
    end;

    //计数减一
    if (FClientContext<>nil) then begin
      FClientContext.FContextLock.Lock;
      FClientContext.ExitIOLoop(FIOEventType);
      FClientContext.FContextLock.UnLock;
    end;

    {$IFDEF _IOCP_MONITOR}
      if FIOEventType = IOWSARecv then _DIMonitor.SubIOCPRecv;
      if FIOEventType = IOWSASend then _DIMonitor.SubIOCPSend;
      if FIOEventType = IOWSACloseSocket then _DIMonitor.SubIOCPCloseSocke;
      if FIOEventType = IOWSAAcceptEx then _DIMonitor.SubIOCPAcceptEx;
    {$ENDIF}

    if bIORet then
    begin
      //一.收到用户自定义退出消息
      if (FIOEventType = IOWSACloseSocket) and
         (FClientContext<>nil) then begin
        FSocketServer.ProcessIOError(FClientContext, FIOEventType, IOErrorCustom);
        continue;
      end;

      //二.得到正常断开的通知
			//GetQueuedCompletionStatus返回true,并且dwIOSize返回0 且 FClientContext, pHandleData不为空
      if ( (dwIOSize=0) and
           (FBuffer<>Nil) and
           (FClientContext<>nil) ) then begin
        FSocketServer.ProcessIOError(FClientContext, FIOEventType, IOErrorNoramal);
        continue;
      end;

      //消息分发
      if (FBuffer<>Nil) then
        FSocketServer.ProcessIOMessage(FIOEventType, FClientContext, FBuffer, dwIOSize)
      else
      begin
      
        {$IFDEF _ICOP_DEBUGERR}
            FSocketServer.AppendErrorLogMessage('消息分发时FBuffer为空, IOCP异常, Work线程继续');
        {$ENDIF}
      end;
    end
    else 
    begin
      //三.异常断开(未得到正常断开的通知)如果得到自定义断开,不处理
      //GetQueuedCompletionStatus   返回false
      //GetLastError返回ERROR_NETNAME_DELETED或者ERROR_INVALID_PARAMETER
      dwIOError := GetLastError();
      if ( dwIOError<>WAIT_TIMEOUT ) then begin
        if (FClientContext<>nil) then begin
          //if (dwIOError = ERROR_NETNAME_DELETED) or (dwIOError = ERROR_INVALID_PARAMETER)
          //then
           FSocketServer.ProcessIOError(FClientContext, FIOEventType, IOErrorAbnormalNoramal);
           continue;
        end;
      end;
    end;
  end;
end;

//TDIThreadPool
constructor TDIThreadPool.Create(SocketServer: TDISocketServer);
begin
  inherited Create;
  FThreadPoolLock := TCriticalSection.Create;
  m_ThreadsList := TList.Create;
  FSocketServer := SocketServer;
  m_nIOMaxWorkThreads := 10;
  m_nIOMinWorkThreads := 2;
  m_nIOWorkerThreads := 4;
  m_bStarted := False;
end;

destructor TDIThreadPool.Destroy;
begin
  if m_bStarted then StopThreadPool;
  FreeAndNil(FThreadPoolLock);
  FreeAndNil(m_ThreadsList);
  inherited Destroy;
end;

procedure TDIThreadPool.SetMinWorkThreads(iMinWorkThreads: Integer);
begin
  if ( (iMinWorkThreads > m_nIOMaxWorkThreads) or
       (iMinWorkThreads < 0) ) then Exit;

  FThreadPoolLock.Lock;
  m_nIOMinWorkThreads := iMinWorkThreads;
  FThreadPoolLock.UnLock;
end;

procedure TDIThreadPool.SetMaxWorkThreads(iMaxWorkThreads: Integer);
begin
  if ( (iMaxWorkThreads < m_nIOMinWorkThreads) or
       (iMaxWorkThreads < 0) ) then Exit;

  FThreadPoolLock.Lock;
  m_nIOMaxWorkThreads := iMaxWorkThreads;
  FThreadPoolLock.UnLock;
end;

procedure TDIThreadPool.SetIOWorkersThread(iWorkerThreads: Integer);
begin
  if ( (iWorkerThreads < m_nIOMinWorkThreads) or
      (iWorkerThreads > m_nIOMaxWorkThreads) ) then Exit;

  FThreadPoolLock.Lock;
  m_nIOWorkerThreads := iWorkerThreads;
  FThreadPoolLock.UnLock;
end;
 
function TDIThreadPool.StartThreadPool: Boolean;
var
  i: Integer;
  p_Threads: PThreadParam;
  WorkerThread: TDIWorkerThread;
begin
  if (m_bStarted) then begin
    Result := False;
    Exit;
  end;

  //创建服务器端启动工作线程
  for i := 0 to m_nIOWorkerThreads - 1 do
  begin
    WorkerThread := TDIWorkerThread.Create(FSocketServer, i+1);
    New(p_Threads);
    p_Threads^.nThread := i+1;
    p_Threads^.FSocketServer := FSocketServer;
    p_Threads^.FDIThread := WorkerThread;
    p_Threads^.hEvent := WorkerThread.Handle;
    m_ThreadsList.Add(p_Threads);
    WorkerThread.Run;
  end;

  {$IFDEF _ICOP_DEBUG}
      FSocketServer.AppendLogMessage('线程池工作线程启动.');
  {$ENDIF}
  
  m_bStarted := TRUE;
  Result := TRUE;
end;

procedure TDIThreadPool.StopThreadPool;
var
  i: Integer;
begin
  if (m_bStarted) then begin
    //关闭所有连接, 侦听套接字
    FSocketServer.StopServer;
    {$IFDEF _ICOP_DEBUG}
        FSocketServer.AppendLogMessage('收到FSocketServer.StopServer消息');
    {$ENDIF}

    //退出线程
    for i:=0 to m_nIOWorkerThreads - 1 do
      FSocketServer.FIOCompletionPort.PostIOCompletionStatus(0, nil, $FFFFFFFF);
    {$IFDEF _ICOP_DEBUG}
        FSocketServer.AppendLogMessage('PostIOCompletionStatus投递工作线程退出消息.');
    {$ENDIF}

    for i:=0 to m_nIOWorkerThreads - 1 do
      WaitForSingleObject(PThreadParam(m_ThreadsList.Items[i]).hEvent, INFINITE);
    {$IFDEF _ICOP_DEBUG}
        FSocketServer.AppendLogMessage('WaitForSingleObject成功等待所有工作线程退出.');
    {$ENDIF}

    for i:=0 to m_nIOWorkerThreads - 1 do begin
      FreeAndNil(PThreadParam(m_ThreadsList.Items[i]).FDIThread);
      PThreadParam(m_ThreadsList.Items[i]).hEvent := 0;
      Dispose(PThreadParam(m_ThreadsList.Items[i]));
    end;
    m_ThreadsList.Clear;

    //回收资源
    FSocketServer.ProcessFreeMemory;
    {$IFDEF _ICOP_DEBUG}
        FSocketServer.AppendLogMessage('IOCP服务器成功释放所有内存,工作线程池停止.');
    {$ENDIF}

    m_bStarted := FALSE;
  end;
end;


end.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -