⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iocpsvr.pas

📁 iocp小程序
💻 PAS
📖 第 1 页 / 共 2 页
字号:
{*******************************************************}
{                                                       }
{       高性能服务器,这个是一个演示DEMO                }
{                                                       }
{       联系邮箱:fansheng_hx@163.com                   }
{                                                       }
{*******************************************************}

unit IOCPSvr;

interface

uses
 Windows, Messages, WinSock2, Classes, SysUtils, SyncObjs;

const
  {* 每一次发送和接收的数据缓冲池大小 *}
  MAX_BUFSIZE = 4096;
  {* 关闭客户端通知消息 *}
  WM_CLIENTSOCKET = WM_USER + $2000;

type
  {* Windows Socket 消息 *}
  TCMSocketMessage = packed record
    Msg: Cardinal;
    Socket: TSocket;
    SelectEvent: Word;
    SelectError: Word;
    Result: Longint;
  end;

  {* IOCP服务器运行轨迹 *}
  TSocketEvent = (seInitIOPort, seUninitIOPort, seInitThread, seUninitThread,
    seInitSocket, seUninitSocket, seConnect, seDisconnect, seListen, seAccept, seWrite, seRead);
const
  CSSocketEvent: array[TSocketEvent] of string = ('InitIOPort', 'UninitIOPort', 'InitThread', 'UninitThread',
    'InitSocket', 'UninitSocket', 'Connect', 'Disconnect', 'Listen', 'Accept', 'Write', 'Read');
type
  {* 产生错误类型 *}
  TErrorEvent = (eeGeneral, eeSend, eeReceive, eeConnect, eeDisconnect, eeAccept);

  {* 完成端口传递的结构体 *}
  TIOCPStruct = packed record
    Overlapped: OVERLAPPED;
    wsaBuffer: TWSABUF;
    Event: TSocketEvent; //读或写
    Buffer: array [0..MAX_BUFSIZE - 1] of Char;
    Assigned: Boolean;  //表示已经分配给某个客户端
    Active: Boolean;    //客服端内部使用,表示是否正在使用
  end;
  PIOCPStruct = ^TIOCPStruct;

  EMemoryBuffer = class(Exception);
  ESocketError = class(Exception);

  TMemoryManager = class;
  TServerSocket = class;
  TSymmetricalSocket = class;

  TMemoryManager = class
  private
    {* 管理内存使用 *}
    FList: TList;
    {* 分配和释放时候使用的锁 *}
    FLock: TCriticalSection;
    {* 服务器 *}
    FServerSocket: TServerSocket;
    function GetCount: Integer;
    function GetIOCPStruct(AIndex: Integer): PIOCPStruct;
  public
    constructor Create(AServerSocket: TServerSocket; ACount: Integer); overload;
    constructor Create(AServerSocket: TServerSocket); overload;
    destructor Destroy; override;

    {* 分配内存使用权 *}
    function Allocate: PIOCPStruct;
    {* 释放内存使用权 *}
    procedure Release(AValue: PIOCPStruct);
    property Server: TServerSocket read FServerSocket;
    property Count: Integer read GetCount;
    property Item[AIndex: Integer]: PIOCPStruct read GetIOCPStruct;
  end;

  {* 客服端链接服务器触发此事件,如果要拒绝链接,把AConnect := False *}
  TOnBeforeConnect = procedure(ASymmIP: string; AConnect: Boolean) of object;
  {* 链接完成之后触发此事件 *}
  TOnAfterConnect = procedure(ASymmetricalSocket: TSymmetricalSocket) of object;
  {* 断开连接触发事件 *}
  TOnAfterDisconnect = procedure(ASymmetricalSocket: TSymmetricalSocket) of object;
  {* 收到数据会触发此事件 *}
  TOnDataEvent = procedure(ASymmetricalSocket: TSymmetricalSocket; AData: Pointer;
    ACount: Integer) of object;
  {* 错误触发事件 *}
  TOnErrorEvent = procedure(AError: Integer; AErrorString: string; AInfo: string; var AHandleError: Boolean) of object;
  {* 服务器运行LOG *}
  TOnLog = procedure (ASocketEvent: TSocketEvent; AInfo: string) of object;

  {* 服务器,负责建立完成端口,管理内存和管理客服端,及Socket消息循环 *}
  TServerSocket = class
  private
    {* 内存管理 *}
    FMemory: TMemoryManager;
    {* 端口 *}
    FPort: Integer;
    {* 套接字 *}
    FSocket: TSocket;
    {* 完成端口句柄 *}
    FIOCPHandle: THandle;
    {* 消息循环句柄 *}
    FHandle: THandle;
    {* 对等的客服端 *}
    FClients: TList;
    {* 服务器运行线程 *}
    FThreads: TList;
    {* 监听线程 *}
    FAcceptThread: TThread;
    {* 表示是否激活 *}
    FActive: Boolean;
    {* 锁 *}
    FLock: TCriticalSection;
    {* 错误触发事件 *}
    FOnError: TOnErrorEvent;
    {* 书写LOG *}
    FOnLog: TOnLog;
    {* 接收连接事件 *}
    FOnBeforeConnect: TOnBeforeConnect;
    {* 连接成功之后的事件 *}
    FOnAfterConnect: TOnAfterConnect;
    {* 断开连接事件 *}
    FOnAfterDisconnect: TOnAfterDisconnect;
    {* 接收数据 *}
    FOnRead: TOnDataEvent;

    procedure WndProc(var AMsg: TMessage);
    {* 激活 *}
    procedure Open;
    {* 关闭 *}
    procedure Close;
    {* 设置激活/关闭 *}
    procedure SetActive(AValue: Boolean);
    {* 触发错误 *}
    function CheckError(AErrorCode: Integer = -1; AInfo: string = ''): Boolean;
    {* 触发LOG *}
    procedure DoLog(ASocketEvent: TSocketEvent; AInfo: string = '');
    {* 设置端口 *}
    procedure SetPort(AValue: Integer);
    {* 注册一个客服端,由于在另外一个线程中调用,需要加锁 *}
    procedure RegisterClient(ASocket: TSymmetricalSocket);
    {* 反注册一个客服端,由于在另外一个线程中调用,需要加锁 *}
    procedure UnRegisterClient(ASocket: TSymmetricalSocket);
    {* 通过Socket句柄查找对等的TSymmetricalSocket *}
    function FindSymmClient(ASocket: TSocket): TSymmetricalSocket;
    {* 客服端关闭消息 *}
    procedure WMClientClose(var AMsg: TCMSocketMessage); message WM_CLIENTSOCKET;
    {* 连接时触发的事件 *}
    function DoConnect(ASocket: TSocket): Boolean;
    {* 连接完成之后触发事件 *}
    procedure DoAfterConnect(ASymSocket: TSymmetricalSocket);
    {* 连接断开触发事件 *}
    procedure DoDisConnect(ASymSocket: TSymmetricalSocket);
    {* 接收数据触发的事件 *}
    procedure DoRead(ASymmetricalSocket: TSymmetricalSocket; AData: Pointer;
      ACount: Integer);
    {* 获得客服端个数 *}
    function GetClientCount: Integer;
    function GetClient(const AIndex: Integer): TSymmetricalSocket;
  public
    constructor Create;
    destructor Destroy; override;
    {* 接收一个客服端,被接收线程调用 *}
    procedure AcceptClient;

    property Port: Integer read FPort write SetPort;
    property Socket: TSocket read FSocket;
    property Handle: THandle read FHandle;
    property Active: Boolean read FActive write SetActive;
    property MemoryManager: TMemoryManager read FMemory;
    {* 事件 *}
    property OnError: TOnErrorEvent read FOnError write FOnError;
    property OnLog: TOnLog read FOnLog write FOnLog;
    property OnRead: TOnDataEvent read FOnRead write FOnRead;
    property OnBeforeConnect: TOnBeforeConnect read FOnBeforeConnect write FOnBeforeConnect;
    property OnAfterConnect: TOnAfterConnect read FOnAfterConnect write FOnAfterConnect;
    property OnAfterDisConnect: TOnAfterDisconnect read FOnAfterDisconnect write FOnAfterDisconnect;
    property ClientCount: Integer read GetClientCount;
    property Client[const AIndex: Integer]: TSymmetricalSocket read GetClient;
  end;

  {* 接收数据、发送数据及管理分配的内存 *}
  TSymmetricalSocket = class
  private
    FSocket: TSocket;
    FServer: TServerSocket;
    FAssignMemory: TList;
    FRemoteAddress, FRemoteHost: string;
    FRemotePort: Integer;

    {* 准备接收数据 *}
    function PrepareRecv(AIOCPStruct: PIOCPStruct = nil): Boolean;
    {* 获得完成端口内存块使用权 *}
    function Allocate: PIOCPStruct;
    {* 处理接收的数据 *}
    function WorkBlock(AIOCPStruct: PIOCPStruct; ACount: DWORD): Integer;
    {* 获得地方IP *}
    function GetRemoteIP: string;
    {* 获得远程机器名 *}
    function GetRemoteHost: string;
    {* 获得远程端口 *}
    function GetRemotePort: Integer;
  public
    constructor Create(ASvrSocket: TServerSocket; ASocket: TSocket);
    destructor Destroy; override;
    {* 发送数据 *}
    function Write(var ABuf; ACount: Integer): Integer;
    function WriteString(const AValue: string): Integer;

    property Socket: TSocket read FSocket;
    property RemoteAddress: string read GetRemoteIP;
    property RemoteHost: string read GetRemoteHost;
    property RemotePort: Integer read GetRemotePort;
  end;

  TSocketThread = class(TThread)
  private
    FServer: TServerSocket;
  public
    constructor Create(AServer: TServerSocket);
  end;

  TAcceptThread = class(TSocketThread)
  protected
    procedure Execute; override;
  end;

  TWorkThread = class(TSocketThread)
  protected
    procedure Execute; override;
  end;

implementation

uses
  RTLConsts;

const
 SHUTDOWN_FLAG = $FFFFFFFF;

{ TMemoryManager }

constructor TMemoryManager.Create(AServerSocket: TServerSocket;
  ACount: Integer);
var
  i: Integer;
  pIOCPData: PIOCPStruct;
begin
  inherited Create;
  FList := TList.Create;
  FLock := TCriticalSection.Create;
  for i := 1 to ACount do
  begin
    New(pIOCPData);
    FillChar(pIOCPData^, SizeOf(PIOCPStruct), 0);
    {* 下面两句其实由FillChar已经完成,在这写,只是为了强调 *}
    pIOCPData.Assigned := False;
    pIOCPData.Active := False;
    FList.Add(pIOCPData);
  end;
end;

function TMemoryManager.Allocate: PIOCPStruct;
var
  i: Integer;
begin
  FLock.Enter;
  try
    Result := nil;
    for i := 0 to FList.Count - 1 do
    begin
      Result := FList[i];
      if not Result.Assigned then
        Break;
    end;
    if (not Assigned(Result)) or (Result.Assigned) then
    begin
      New(Result);
      FList.Add(Result);
    end;
    FillChar(Result^, SizeOf(TIOCPStruct), 0);
    Result.Assigned := True;
    Result.Active := False;
  finally
    FLock.Leave;
  end;
end;

constructor TMemoryManager.Create(AServerSocket: TServerSocket);
begin
  Create(AServerSocket, 200);
end;

destructor TMemoryManager.Destroy;
var
  i: Integer;
begin
  for i := 0 to FList.Count - 1 do
    FreeMem(FList[i]);
  FList.Clear;
  FList.Free;
  FLock.Free;
  inherited;
end;

function TMemoryManager.GetCount: Integer;
begin
  Result := FList.Count;
end;

function TMemoryManager.GetIOCPStruct(AIndex: Integer): PIOCPStruct;
begin
  Result := nil;
  if (AIndex >= FList.Count) or (AIndex < 0) then
    EMemoryBuffer.CreateFmt(SListIndexError, [AIndex])
  else
    Result := FList[AIndex];
end;

procedure TMemoryManager.Release(AValue: PIOCPStruct);
begin
  FLock.Enter;
  try
    AValue.Assigned := False;
    AValue.Active := False;
  finally
    FLock.Leave;
  end;
end;

{ TServerSocket }

constructor TServerSocket.Create;
begin
  FMemory := TMemoryManager.Create(Self);
  FClients := TList.Create;
  FThreads := TList.Create;
  FSocket := INVALID_SOCKET;
  FLock := TCriticalSection.Create;

  FPort := 6666;
  FAcceptThread := nil;
  FIOCPHandle := 0;
  FHandle := AllocateHWnd(WndProc);
end;

destructor TServerSocket.Destroy;
begin
  //关闭完成端口
  SetActive(False);
  FThreads.Free;
  FClients.Free;
  DeallocateHWnd(FHandle);
  FMemory.Free;
  FLock.Free;
  inherited;
end;

procedure TServerSocket.Open;
var
  SystemInfo: TSystemInfo;
  i: Integer;
  Thread: TThread;
  Addr: TSockAddr;
  WSData: TWSAData;
begin
  try
    if WSAStartup($0202, WSData) <> 0 then
    begin
      raise ESocketError.Create('WSAStartup');
    end;
    DoLog(seInitIOPort);  //初始化完成端口
    FIOCPHandle := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
    if FIOCPHandle = 0 then
      CheckError;

    DoLog(seInitThread); //初始化工作线程
    GetSystemInfo(SystemInfo);
    for i := 0 to SystemInfo.dwNumberOfProcessors * 2 -1 do
    begin
      Thread := TWorkThread.Create(Self);
      FThreads.Add(Thread);
    end;

    DoLog(seInitSocket); //建立套接字
    FSocket := WSASocket(PF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);
    if FSocket = INVALID_SOCKET then CheckError;

    FillChar(Addr, SizeOf(TSockAddr), 0);
    Addr.sin_family := AF_INET;
    Addr.sin_port := htons(FPort);
    Addr.sin_addr.S_addr := htonl(INADDR_ANY);
    CheckError(bind(FSocket, @Addr, SizeOf(TSockAddr)), 'bind');

    DoLog(seListen);  //开始监听
    CheckError(listen(FSocket, 5), 'listen');
    FAcceptThread := TAcceptThread.Create(Self);
  except
    on E: Exception do
    begin
      Close;
      CheckError(GetLastError, E.Message);
    end;
  end;
end;

procedure TServerSocket.Close;
var
  i: Integer;
  Thread: TThread;
begin
  try
    WSACleanup;
    DoLog(seUninitSocket);
    FAcceptThread.Terminate;
    if FSocket <> INVALID_SOCKET then
    begin
      closesocket(FSocket);
      FSocket := INVALID_SOCKET;
    end;

    DoLog(seUninitThread);
    for i := FThreads.Count - 1 downto 0 do
    begin
      Thread := FThreads[i];
      Thread.Terminate;
      PostQueuedCompletionStatus(FIOCPHandle, 0, 0, Pointer(SHUTDOWN_FLAG))
    end;
    FThreads.Clear;

    for i := FClients.Count - 1 downto 0 do
    begin
      TSymmetricalSocket(FClients[i]).Free;
    end;
    FClients.Clear;

    DoLog(seUninitIOPort);
    CloseHandle(FIOCPHandle);
    FIOCPHandle := 0;
  except
    on E: Exception do
    begin
      Close;
      CheckError(-1, E.Message);
    end;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -