⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udiiocpsocketserver.pas

📁 楠楠写的DBiocp例子都是源码
💻 PAS
📖 第 1 页 / 共 3 页
字号:
unit uDIIocpSocketServer;

interface

uses
  Windows, Sysutils, WinSock2, uDIBuffer, uDIMapBuffer, uDIClientChannel, uDISocketServer,
  uDIProtocol, uDIThread;

type
  TDIIocpSocketServer = class;
  TDIListenThread = class;
  
  //线程结构
  PThreadWorkParam = ^ThreadWorkParam;
  ThreadWorkParam = packed record
    nThread:        DWORD;                      //索引
    FSocketServer:  TDIIocpSocketServer;        //指向IOCPServer的指针
    FDIThread:      TDIThread;                  //线程
    hEvent:         Thandle;                    //事件
  end;

  TOnSocketEvent = procedure(FClientChannel: TDIClientChannel) of object;
  TOnSocketIOEvent = procedure(FClientChannel: TDIClientChannel; FDIBuffer: TDIBuffer) of object;

  TDIIocpSocketServer = class(TDISocketServer)
  protected
    m_iIOCPPostAccepts:             Integer;                //下一次需要投递的AcceptEx数量
    m_nInitPostAcceptEx:            DWORD;                  //初次投递AcceptEx 数量
    m_DIMapAcceptExBufferList:      TDIMapBuffer;           //AcceptEx内存池列表

    m_WrokThreads: array of ThreadWorkParam;                //工作者线程数组
    m_hPostAcceptEvent: Thandle;          //投递Accept I/O事件内核对象(连接新用户投递维持AcceptEx数量)
    m_hListenThreadEvent: Thandle;        //侦听线程事件WSAEventSelectI/O模型处理Accept事件
    m_hWaitEventList: array of Thandle;   //Wait线程内核对象列表(AcceptEx线程)
    m_hShutdownEvent: Thandle;            //停止服务事件
    FDIListenThread: TDIListenThread;
  published
    property FInitPostAcceptEx: DWORD read m_nInitPostAcceptEx write m_nInitPostAcceptEx;
  private
    function Startup: Boolean; override;
  public
    constructor Create;
    destructor Destroy; override;
    
  private
    FOnNewSocketEvent:      TOnSocketEvent;     //客户端连接
    FOnCloseSocketEvent:    TOnSocketEvent;     //客户端关闭
    FOnReadCompletedEvent:  TOnSocketIOEvent;   //接受客户端消息
    FOnWriteCompletedEvent: TOnSocketIOEvent;   //发送客户端消息
  published
    property OnNewSocketEvent: TOnSocketEvent read FOnNewSocketEvent write FOnNewSocketEvent;
    property OnCloseSocketEvent: TOnSocketEvent read FOnCloseSocketEvent write FOnCloseSocketEvent;
    property OnReadCompletedEvent: TOnSocketIOEvent read FOnReadCompletedEvent write FOnReadCompletedEvent;
    property OnWriteCompletedEvent: TOnSocketIOEvent read FOnWriteCompletedEvent write FOnWriteCompletedEvent;
  private
    {
    procedure DisconnectClient(FClientChannel: TDIClientChannel);
    procedure DisconnectAllClient;    }
  protected
    //启动侦听服务
    function SetupListner: Boolean;
    procedure ListnerThreadProc;
    function SetupIOWorkers: Boolean;
    procedure IOWorkerThreadProc;

     //生成客户端上下文(并投递Recv I/O操作)
    //function AssociateIncomingClientWithContext(pDIBuffer: TDIBuffer): Boolean;


    //消息分发
    procedure ProcessIOMessage(pClientChannel: TDIClientChannel; pDIBuffer: TDIBuffer; dwIoSize: DWORD); override;
    //消息处理函数
    function PostWSAAcceptEx(pDIBuffer: TDIBuffer): Boolean;
    function OnWSAAcceptEx(pDIBuffer: TDIBuffer): Boolean;
    function PostWSACloseSocket(pClientChannel: TDIClientChannel;  pDIBuffer: TDIBuffer): Boolean;

    {function PostWSARecv(pClientChannel: TDIClientChannel; pDIBuffer: TDIBuffer): Boolean;
    function PostWSASend(pClientChannel: TDIClientChannel; pDIBuffer: TDIBuffer): Boolean;
    

    
    function OnWSARecv(pClientChannel: TDIClientChannel; pDIBuffer: TDIBuffer; dwIoSize: DWORD): Boolean;
    function OnWSASend(pClientChannel: TDIClientChannel; pDIBuffer: TDIBuffer; dwIoSize: DWORD): Boolean;
    }
  public
		procedure ShutDown; override;

    {
    procedure FreeAllClientChannel; override;
    procedure ParseIOError(FClientChannel: TDIClientChannel; FBuffer: TDIBuffer; dwIoSize: DWORD); override;
      }
  end;

  TDIListenThread = class(TDIThread)
  protected
    FIOCPServer: TDIIocpSocketServer;
    procedure ThreadRoutine; override;
  public
    constructor Create(SocketServer: TDIIocpSocketServer);
  end;

  TDIWorkerThread = class(TDIThread)
  protected
    FIOCPServer: TDIIocpSocketServer;
    procedure ThreadRoutine; override;
  public
    constructor Create(SocketServer: TDIIocpSocketServer);
  end;

 

implementation
  uses Dialogs;
  
//TDIListenThread
constructor TDIListenThread.Create(SocketServer: TDIIocpSocketServer);
begin
  inherited Create;
  FIOCPServer := SocketServer;
end;

procedure TDIListenThread.ThreadRoutine;
begin
  FIOCPServer.ListnerThreadProc;
end;

//TDIWorkerThread
constructor TDIWorkerThread.Create(SocketServer: TDIIocpSocketServer);
begin
  inherited Create;
  FIOCPServer := SocketServer;
end;

procedure TDIWorkerThread.ThreadRoutine;
begin
  FIOCPServer.IOWorkerThreadProc;
end;

constructor TDIIocpSocketServer.Create;
begin
  inherited Create;
  m_iIOCPPostAccepts := 0;
  m_nInitPostAcceptEx := 100;
  m_DIMapAcceptExBufferList := TDIMapBuffer.Create;
end;

destructor TDIIocpSocketServer.Destroy;
begin
  FreeAndNil(m_DIMapAcceptExBufferList);
  inherited Destroy;
end;

function TDIIocpSocketServer.PostWSAAcceptEx(pDIBuffer: TDIBuffer): Boolean;
var
  sAccept: TSocket;
  dwIoSize: Cardinal;
  nRetVal: LongBool;
begin
  if pDIBuffer = nil then begin
    AppendErrorMessage('PostWSAAcceptEx投递Accept I/O操作失败, pDIBuffer is NULL.');
    Result := False;
    Exit;
  end;

  sAccept := Winsock2.WSASocketA(AF_INET, SOCK_STREAM, 0, Nil, 0, WSA_FLAG_OVERLAPPED);
  if sAccept = INVALID_SOCKET then begin
    AppendErrorMessage(Format('PostWSAAcceptEx 创建监听套按字失败: %d.', [WSAGetLastError()]));
    Result := FALSE;
    Exit;
  end;

  //设置标志
  dwIoSize := 0;
  pDIBuffer.SetOperation(IOWSAAcceptEx);
  pDIBuffer.SetupRead;
  pDIBuffer.m_Socket := sAccept;
  nRetVal := m_lpfnAcceptEx( m_sListenSocket,
                             pDIBuffer.m_Socket,
                             pDIBuffer.GetBuffer,
                             0,
                             sizeof(TSockAddrIn)+16,
                             sizeof(TSockAddrIn)+16,
                             dwIoSize,
                             POverlapped(@pDIBuffer.PerHandleData.m_overlap));

  if( (nRetVal=FALSE) and (WSAGetLastError()<>WSA_IO_PENDING)) then
  begin
    AppendErrorMessage(Format('PostAcceptProcess投递Accept I/O操作失败: %d.', [WSAGetLastError()]));
    Result := FALSE;
    Exit;
  end;

  Result := TRUE;
end;

function TDIIocpSocketServer.OnWSAAcceptEx(pDIBuffer: TDIBuffer): Boolean;
begin
  //判断接受到的数据为零,则断开连接。
	if(pDIBuffer.m_Socket = INVALID_SOCKET) then begin
    AppendErrorMessage('OnWSAAcceptEx接受到dwTrans为零, 客户端关闭Socket连接.');
    WinSock2.closesocket(pDIBuffer.m_Socket);
    pDIBuffer.m_Socket := INVALID_SOCKET;
    Result := FALSE;
  end;

  //1.设置AcceptEx的Socket为ListenSocket状态。
	//2.生成客户端上下文(投递Recv I/O操作)。
	if ChangeSocketModeAccept(pDIBuffer.m_Socket) then
  begin
    //if not AssociateIncomingClientWithContext(pDIBuffer) then
    //  AppendErrorMessage('AssociateIncomingClientWithContext is Failer.');
  end;

 	//Accept请求完成,移出AcceptEx队列
  m_DIMapAcceptExBufferList.RemoveDIBuffer(pDIBuffer);
	//回收Buffer
  m_FreeDIBufferList.ReleaseBufferToPool(pDIBuffer);

	//通知监听线程继续再投递一个Accept请求
	InterlockedIncrement(m_iIOCPPostAccepts);
	//m_hPostAcceptEvent事件,I/O的线程连接新用户,投递维持一定AcceptEx数量
	SetEvent(m_hPostAcceptEvent);
	Result := TRUE;
end;

//侦听线程
procedure TDIIocpSocketServer.ListnerThreadProc;
var
  i: Integer;
  nRet, nIndex, m_nPostAccepts: DWORD;
  pBuffer: TDIBuffer;
  Events: TWSANetworkEvents;
begin
  //投递一定数量的AcceptEx I/O
  for i:=1 to m_nInitPostAcceptEx do begin
    pBuffer := nil;
    pBuffer := m_FreeDIBufferList.AllocateFreeBufferFromPool;
    if (pBuffer = nil) then begin
      AppendErrorMessage('AllocateFreeBufferFromPool错误, ListnerThreadProc线程退出.');
      Exit;
    end
    else
    begin
      m_DIMapAcceptExBufferList.AddDIBuffer(pBuffer);
      if not PostWSAAcceptEx(pBuffer) then begin
        AppendErrorMessage('PostWSAAcceptEx投递Accept I/O错误, ListnerThreadProc线程退出.');
        Exit;
      end;
    end;
  end;

  
  while (WaitForSingleObject(m_hShutdownEvent, INFINITE) <> WAIT_OBJECT_0) do
  begin
    //等网络事件m_hPostAcceptEvent事件和m_hListenThreadEvent事件
    nIndex := WSAWaitForMultipleEvents(2, @m_hWaitEventList, FALSE, INFINITE, FALSE);
    if (nIndex = WSA_WAIT_FAILED) then begin
      AppendErrorMessage('WSA_WAIT_FAILED错误, ListnerThreadProc线程退出.');
      Break;
    end;

    nIndex := nIndex - WAIT_OBJECT_0;
    //m_hPostAcceptEvent事件,I/O的线程连接新用户,投递维持一定AcceptEx数量
    if (nIndex = 0 ) then begin
      m_nPostAccepts := InterlockedExchange(m_iIOCPPostAccepts, 0);
    end
    else if (nIndex = 1 ) then begin
           nRet := WSAEnumNetworkEvents( m_sListenSocket,
                                         m_hWaitEventList[nIndex],
                                         @Events );
           if (nRet = SOCKET_ERROR) then begin
             AppendErrorMessage(Format('WSAEnumNetworkEvents错误%d, ListnerThreadProc线程退出.', [WSAGetLastError()]));
             Break;
           end;

           if (Events.lNetworkEvents = FD_ACCEPT) then begin
             //多次检查m_bShutDown状态,以便退出线程
             if ( (Events.iErrorCode[FD_ACCEPT_BIT] = 0) and
                  (m_bAcceptConnections) and
                  (not m_bShutDown) ) then begin
               m_nPostAccepts := 50;  //增加的个数,这里设为50个
             end
             else
             begin
               AppendErrorMessage(Format('异常WSAEnumNetworkEvents事件错误%d, ListnerThreadProc线程退出.', [WSAGetLastError()]));
               Break;
             end;
           end;
         end;

    //继续投递一定数量的AcceptEx I/O
    for i:=1 to m_nPostAccepts do begin
      pBuffer := nil;
      pBuffer := m_FreeDIBufferList.AllocateFreeBufferFromPool;
      if (pBuffer = nil) then begin
        AppendErrorMessage('AllocateFreeBufferFromPool错误, ListnerThreadProc线程退出.');
        Exit;
      end
      else
      begin
        m_DIMapAcceptExBufferList.AddDIBuffer(pBuffer);
        if not PostWSAAcceptEx(pBuffer) then begin
          AppendErrorMessage('PostWSAAcceptEx投递Accept I/O错误, ListnerThreadProc线程退出.');
          Exit;
        end;
      end;
    end;

  end;
end;

function TDIIocpSocketServer.SetupListner: Boolean;
var
  bVal: LongBool;
  nRet: Integer;
  dwBytesReceived: DWORD;
  dwThreadID: DWORD;
  sock_Addr: TSockAddrIn;
begin
  //创建socket
  m_sListenSocket := Winsock2.WSASocket(AF_INET, SOCK_STREAM, 0, Nil, 0, WSA_FLAG_OVERLAPPED);
  if m_sListenSocket = INVALID_SOCKET then begin
    AppendErrorMessage(Format('创建 Socket监听套按字失败: %d.', [WSAGetLastError()]));
    Result := FALSE;
    Exit;
  end;

  //设置端口复用
  bVal := True;
  nRet := SetSockOpt(m_sListenSocket, SOL_SOCKET, SO_REUSEADDR, PChar(@bVal), SizeOf(bVal));
  if (nRet = SOCKET_ERROR) then begin
    AppendErrorMessage(Format('setsockopt(SO_EXCLUSIVEADDRUSE) 创建失败: %d.', [WSAGetLastError()]));
		Winsock2.closesocket(m_sListenSocket);
    Result := FALSE;
    Exit;
  end;
  
  //设置Wait线程内核对象列表(AcceptEx线程)
  SetLength(m_hWaitEventList, 2);

  //投递Accept I/O事件内核对象
	m_hPostAcceptEvent := CreateEvent(nil, FALSE, FALSE, nil);
  if (m_hPostAcceptEvent = WSA_INVALID_EVENT) then begin
    AppendDisplayMsg(Format('WSACreateEvent() 创建失败: %d.', [WSAGetLastError()]));
    WSACloseEvent(m_hShutdownEvent);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -