⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iocpsocket.pas

📁 IOCP Socket Server控件
💻 PAS
字号:
{****************************************************

           IOCP Socket Server

          主要参考自CodeProject下载源码,
          特别感谢张无忌的帮助

          这个版本还很粗糙,很多地方没有处理好,最近
          比较忙,没有精力和时间来完善它,大家如果感
          兴趣,可以接着完善,或者干脆重写。

          如果是从这份代码改写,写完了别忘了发给我一份
          wjjxm@etang.com


          write by: softdog
          2003.2

****************************************************}
unit IOCPSocket;

interface

uses
  SysUtils, Windows, WinSock2, SyncObjs, Classes;

const
  BUFFER_SIZE = 2048;

type
  TSocketStatus = (ssAccept, ssSend, ssRecv, ssClose, ssStop);

  PPerHandleData = ^TPerHandleData;
  TPerHandleData = record
    Overlapped: OVERLAPPED;
    wsaBuffer: WSABUF;
    Statu: TSocketStatus;
    Socket: TSocket;
    Buffer: array[0..BUFFER_SIZE - 1] of char;
  end;

  TIOCPProcThread = class;
  TIOCPServer = class;

  TErrorLogProc = procedure(Msg: string; param: array of const) of object;
  TOnClientConnect = procedure(NewSocket: TSocket) of object;
  TOnClientDisConnect = procedure(ASocket: TSocket) of object;
  TOnDataRecive = procedure(IOData: PPerHandleData; var OperPosted: boolean) of object;
  TOnDataSend = procedure(IOData: PPerHandleData; var OperPosted: boolean) of object;

  //服务端Socket类
  TIOCPServer = class(TComponent)
  private
    FActive: boolean;
    FLocalAddress: string;
    FLocalPort: Word;
    FListenPort: THandle;
    FListenSocket: TSocket;
    FAcceptThreads: word;
    FAcceptThreadPool: TList;
    FProcPool: TList;
    FClientList: TStringList;
    FThreadCacheSize: Word;
    FErrorLogProc: TErrorLogProc;
    FLogProc: TErrorLogProc;
    FOnClientConnect: TOnClientConnect;
    FOnClientDisConnect: TOnClientDisConnect;
    FOnDataRecive: TOnDataRecive;
    FOnDataSend: TOnDataSend;
    FCount: integer;
    procedure SetActive(const Value: boolean);
    procedure SetLocalAddress(const Value: string);
    procedure SetLocalPort(const Value: word);
    procedure SetThreadCacheSize(const Value: Word);
  protected
    procedure ErrorLog(Msg: string; param: array of const);
    procedure Log(Msg: string; param: array of const);
    procedure DoClientClose(SocketInfo: TSocket);
  public
    constructor Create(Owner: TComponent); override;
    destructor Destroy; override;
    function Start: boolean;
    function Stop: boolean;
    function SendData(IOData: TPerHandleData): integer;
    function PostRead(SocketInfo: TSocket): boolean;
  published
    property Active: boolean read FActive write SetActive;
    property LocalAddress: string read FLocalAddress write SetLocalAddress;
    property LocalPort: word read FLocalPort write SetLocalPort;
    property AcceptThreads: word read FAcceptThreads write FAcceptThreads;
    property ThreadCacheSize: Word read FThreadCacheSize write SetThreadCacheSize;
    property OnErrorLog: TErrorLogProc read FErrorLogProc write FErrorLogProc;
    property OnLog: TErrorLogProc read FLogProc write FLogProc;
    property OnClientConnect: TOnClientConnect read FOnClientConnect write FOnClientConnect;
    property OnClientDisConnect: TOnClientDisConnect read FOnClientDisConnect write FOnClientDisConnect;
    property OnDataRecive: TOnDataRecive read FOnDataRecive write FOnDataRecive;
    property OnDataSend: TOnDataSend read FOnDataSend write FOnDataSend;
  end;

  //监听线程类
  TIOCPAcceptThread = class(TThread)
  private
    FServerComm: TIOCPServer;
    FLogMsg: string;
  protected
    procedure Log;
    procedure Errlog;
    procedure Execute; override;
  public
    constructor Create(AServerComm: TIOCPServer);
  end;

  //服务端处理线程
  TIOCPProcThread = class(TThread)
  private
    FServerComm: TIOCPServer;
    FLogMsg: string;
  protected
    procedure Log;
    procedure Errlog;
    procedure Execute; override;
  public
    constructor Create(AServerComm: TIOCPServer);
    destructor Destroy; override;
    property ServerComm: TIOCPServer read FServerComm;
  end;

procedure InitSocket;
procedure DoneSocket;
procedure Register;

var
  GlobalLock: TCriticalSection;

implementation

var
  bCanLog: boolean;

procedure Register;
begin
  RegisterComponents('IOCP', [TIOCPServer]);
end;

procedure InitSocket;
var
  wsaData: TWSAData;
begin
  WSAStartup(makeword(2, 0), wsaData);
end;

procedure DoneSocket;
begin
  WSACleanup;
end;

{ TIOCPServer }

constructor TIOCPServer.Create(Owner: TComponent);
begin
  inherited Create(Owner);

  FAcceptThreads := 5;
  FThreadCacheSize := 5;
  FAcceptThreadPool := TList.Create;
  FClientList := TStringList.Create;
  FProcPool := TList.Create;
end;

destructor TIOCPServer.Destroy;
begin
  bCanLog := false;
  Stop;
  FAcceptThreadPool.Free;
  FClientList.Free;
  FProcPool.Free;

  inherited;
end;

procedure TIOCPServer.DoClientClose(SocketInfo: TSocket);
var
  idx: integer;
begin
  idx := FClientList.IndexOf(inttostr(SocketInfo));
  if idx >= 0 then
  begin
    closesocket(SocketInfo);
    FClientList.Delete(idx);
  end;

  if Assigned(FOnClientDisConnect) then
    FOnClientDisConnect(SocketInfo);
end;

procedure TIOCPServer.ErrorLog(Msg: string; param: array of const);
begin
  if Assigned(FErrorLogProc) and bCanLog then
    FErrorLogProc(Msg, Param);
end;

procedure TIOCPServer.Log(Msg: string; param: array of const);
begin
  if Assigned(FLogProc) and bCanLog then
    FLogProc(Msg, Param);
end;

function TIOCPServer.PostRead(SocketInfo: TSocket): boolean;
var
  byteRecv, Flags: DWORD;
  HandleData: PPerHandleData;
begin
  Flags := 0;
  New(HandleData);
  FillChar(HandleData.Overlapped, Sizeof(Overlapped), 0);
  FillChar(HandleData.Buffer, BUFFER_SIZE, 0);
  HandleData.wsaBuffer.buf := HandleData.Buffer;
  HandleData.wsaBuffer.len := BUFFER_SIZE;
  HandleData.Statu := ssRecv;
  HandleData.Socket := SocketInfo;

  Result := (WSARecv(SocketInfo, @(HandleData.wsaBuffer), 1,
    byteRecv,
    Flags,
    @HandleData.Overlapped,
    nil)
    <> SOCKET_ERROR);
end;

function TIOCPServer.SendData(IOData: TPerHandleData): integer;
var
  byteSend, Flags: DWORD;
  NewData: PPerHandleData;
begin
  new(NewData);
  NewData^ := IOData;
  Flags := 0;
  FillChar(NewData^.Overlapped, Sizeof(Overlapped), 0);
  NewData^.Statu := ssSend;

  if WSASend(NewData^.Socket, @(NewData^.wsaBuffer), 1,
    byteSend,
    Flags,
    @(NewData^.Overlapped),
    nil) = SOCKET_ERROR then
    if WSAGetLastError <> ERROR_IO_PENDING then
    begin
      Result := -1;
      exit;
    end;

  Result := byteSend;
end;

procedure TIOCPServer.SetActive(const Value: boolean);
begin
  if FActive = Value then
    exit;

  if Value then
    Start
  else
    Stop;

  FActive := Value;
end;

procedure TIOCPServer.SetLocalAddress(const Value: string);
begin
  if FActive then
  begin
    ErrorLog('改变参数前必须停止监听!', []);
    exit;
  end;

  FLocalAddress := Value;
end;

procedure TIOCPServer.SetLocalPort(const Value: word);
begin
  if FActive then
  begin
    ErrorLog('改变参数前必须停止监听!', []);
    exit;
  end;

  FLocalPort := Value;
end;

procedure TIOCPServer.SetThreadCacheSize(const Value: Word);
begin
  FThreadCacheSize := Value;

  if FProcPool = nil then
    FProcPool := TList.Create;
end;

function TIOCPServer.Start: boolean;
var
  addr: TSockAddr;
  i: integer;
begin
  if FActive then
  begin
    ErrorLog('服务已经启动!', []);
    result := false;
    exit;
  end;

  FListenPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);

  //创建IOCP处理线程
  for i := 1 to FThreadCacheSize do
  begin
    FProcPool.Add(TIOCPProcThread.Create(self));
    inc(FCount);
  end;

  //创建Overlapped Socket
  FListenSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);

  if FListenSocket = INVALID_SOCKET then
  begin
    ErrorLog('创建套接字错误:%d', [WSAGetLastError]);
    Result := false;
    exit;
  end;

  addr.sin_family := AF_INET;
  addr.sin_port := htons(FLocalPort);
  addr.sin_addr.S_addr := INADDR_ANY;

  if Winsock2.bind(FListenSocket, @addr, sizeof(addr)) <> 0 then
  begin
    ErrorLog('绑定套接字错误:%d', [WSAGetLastError]);
    Result := false;
    exit;
  end;

  if Winsock2.listen(FListenSocket, 5) <> 0 then
  begin
    ErrorLog('监听套接字错误:%d', [WSAGetLastError]);
    Result := false;
    exit;
  end;

  //创建监听线程
  for i := 1 to AcceptThreads do
    FAcceptThreadPool.Add(TIOCPAcceptThread.Create(self));

  Result := true;
  FActive := true;

  Log('服务已经启动在端口: %d', [LocalPort]);
end;

function TIOCPServer.Stop: boolean;
var
  i: integer;
  Closelapped: PPerHandleData;
begin
  if not FActive then
  begin
    result := false;
    exit;
  end;

  //关闭监听线程
  for i := FAcceptThreadPool.Count downto 1 do
  begin
    TIOCPAcceptThread(FAcceptThreadPool.Items[i - 1]).Terminate;
    FAcceptThreadPool.Delete(i - 1);
  end;

  //关闭监听Socket
  if FListenSocket <> INVALID_SOCKET then
  begin
    closesocket(FListenSocket);
    FListenSocket := INVALID_SOCKET;
  end;

  //断开所有客户端连接
  for i := FClientList.Count - 1 downto 0 do
    DoClientClose(strtoint(FClientList.Strings[i]));

  //发送关闭处理线程信号
  for i := 1 to FProcPool.Count do
  begin
    New(Closelapped);
    Closelapped.Statu := ssStop;
    PostQueuedCompletionStatus(FListenPort, 1, 0, @(Closelapped.Overlapped));
  end;
  FProcPool.Clear;

  //关闭完成端口句柄
  if FListenPort <> 0 then
    CloseHandle(FListenPort);

  Result := true;
  FActive := false;

  Log('服务已经停止.', []);
end;

{ TIOCPAcceptThread }

constructor TIOCPAcceptThread.Create(AServerComm: TIOCPServer);
begin
  inherited Create(true);
  FServerComm := AServerComm;
  FreeOnTerminate := true;
  Resume;
end;

procedure TIOCPAcceptThread.Errlog;
begin
  FServerComm.ErrorLog(FLogMsg, []);
end;

procedure TIOCPAcceptThread.Execute;
var
  Addr: TSockAddr;
  AddrLen: integer;
  sNew: TSocket;
  NewPort: THandle;
begin
  AddrLen := SizeOf(Addr);
  while not Terminated do
  begin
    sNew := Winsock2.accept(FServerComm.FListenSocket, Addr, AddrLen);
    if sNew <> INVALID_SOCKET then
    begin
      GlobalLock.Enter;

      //绑定完成端口
      NewPort := CreateIoCompletionPort(sNew, FServerComm.FListenPort, 0, 0);
      if NewPort = 0 then
        Continue
      else
      begin
        FServerComm.FClientList.Add(inttostr(sNew));

        //发送读取请求
        FServerComm.PostRead(sNew);

        if Assigned(FServerComm.OnClientConnect) then
          FServerComm.OnClientConnect(sNew);

        FLogMsg := Format('客户端连接:%d', [sNew]);
        Synchronize(Log);

        GlobalLock.Leave;
      end;
    end else
      if WSAGetLastError = WSAEINTR then
      begin
        FLogMsg := '监听线程终止.';
        Synchronize(Log);
        Terminate;
      end;
  end;
end;

procedure TIOCPAcceptThread.Log;
begin
  FServerComm.Log(FLogMsg, []);
end;

{ TIOCPProcThread }

constructor TIOCPProcThread.Create(AServerComm: TIOCPServer);
begin
  FServerComm := AServerComm;
  FreeOnTerminate := true;

  inherited Create(false);
end;

destructor TIOCPProcThread.Destroy;
begin
  inherited;
end;

procedure TIOCPProcThread.Errlog;
begin
  FServerComm.ErrorLog(FLogMsg, []);
end;

procedure TIOCPProcThread.Execute;
var
  HandleData: PPerHandleData;
  byteRece, Key: DWORD;
  OperPosted: boolean;
  errCode: integer;
begin
  while not Terminated do
  begin
    if not GetQueuedCompletionStatus(FServerComm.FListenPort, byteRece, Key, POverlapped(HandleData), INFINITE) then
    begin
      errCode := GetLastError;
        
      if errcode = 6 then
        Terminate;
      Continue;
    end;

    GlobalLock.Enter;

    //客户端断开连接
    if byteRece = 0 then
    begin
      FLogMsg := Format('客户端断开连接:%d', [HandleData.Socket]);
      Synchronize(Log);
      FServerComm.DoClientClose(HandleData.Socket);
      Dispose(HandleData);
      GlobalLock.Leave;
      Continue;
    end;

    case HandleData.Statu of
      ssRecv:
        begin
          OperPosted := false;
          if Assigned(FServerComm.FOnDataRecive) then
            FServerComm.FOnDataRecive(HandleData, OperPosted);
          if not OperPosted then
            FServerComm.PostRead(HandleData.Socket);
        end;
      ssSend:
        begin
          OperPosted := false;
          if Assigned(FServerComm.FOnDataSend) then
            FServerComm.FOnDataSend(HandleData, OperPosted);
          if not OperPosted then
            FServerComm.PostRead(HandleData.Socket);
        end;
      ssClose:
        begin
          FServerComm.DoClientClose(HandleData.Socket);
        end;
      ssStop:
        begin
          Dec(FServerComm.FCount);
          Terminate;
        end;
    end; // Case

    Dispose(HandleData);
    GlobalLock.Leave;
  end;
end;

procedure TIOCPProcThread.Log;
begin
  FServerComm.Log(FLogMsg, []);
end;

initialization
  InitSocket;
  GlobalLock := TCriticalSection.Create;
  bCanLog := true;
finalization
  DoneSocket;
  GlobalLock.Free;
end.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -