⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 uciocpserver.pas

📁 楠楠写的DBiocp例子都是源码
💻 PAS
📖 第 1 页 / 共 2 页
字号:
unit uCIOCPServer;

interface

uses
  Windows, Classes, Sysutils, Contnrs, WinSock2, uCriticalSection, uHashTable,
  uDIProtocol, uIOCPRTL, uCIOCPBuffer;

type      
  TCIOCPServer = class;

  //线程结构
  PThreadParam = ^ThreadParam;  ThreadParam = packed record    nThread: DWORD;                   //索引    pCIOCPServer: TCIOCPServer;       //指向IOCPServer的指针    hEvent: Thandle;                  //事件  end;

  //心跳结构
  PHeartBeatParam = ^HeartBeatParam;
  HeartBeatParam = packed record
    nHeartBeatCount: DWORD;           //次数    nBeginTime: DWORD;                //时间  end;

  TCIOCPServer = Class
  private
    AcceptBuffer: TAbstractBuffer;          //AcceptEX 缓冲
    m_ThreadParams: array of ThreadParam;   //工作者线程数组
  private
    m_nPort: DWORD;                         //侦听端口号
    m_sListen: TSocket;                     //Socket监听端口
    m_iWSAInitResult: Integer;              //加载WinSock库返回值

    m_bServerStarted: Boolean;              //服务器是否启动
    m_bAcceptConnections: Boolean;          //是否允许客户端连接
    m_bShutDown: Boolean;                   //是否停止服务
    m_bOneIPConnections: Boolean;           //是否限制一个IP只允许一个连接

    m_iNumberOfActiveConnections: DWORD;    //当前客户端连接个数
    m_iMaxNumConnections: DWORD;            //允许最多客户端连接个数
    m_nIOWorkers: DWORD;                    //当前运行的I/O线程数
    m_iMaxIOCPIOWorkers: DWORD;             //允许运行的I/O线程数
    m_iMaxJobWorkers: DWORD;                //最大工作线程数据
    m_iMaxNumberOfFreeContext: DWORD;       //空闲上下文池允许的最大数

    m_iIOCPMaxPostAccepts: DWORD;           //最大投递Accept的个数(峰值)
    m_iIOCPInitPostAccepts: DWORD;          //初次投递Accept的个数
    m_iIOCPPostAccepts: DWORD;              //客户端连接后需投递Accept的数量

  private
    m_FreeContextQueueLock: TCriticalSection;     	  //客户端Context空闲池锁
    FFreeContextQueue: TQueue;              //队列先进先出Context空闲池

    m_ClientContextTListLock: TCriticalSection;       //客户端Context队列锁
    FClientContext: THashTable;             //客户端哈索表

    m_LogLock: TCriticalSection;                      //日志锁

  private
    m_hShutdownEvent: Thandle;                        //停止服务事件内核对象
    m_hCompletionPort: Thandle;           	          //完成端口内核对象

  private
    function GetHostIPAddr: String;
    function CreateCompletionPort: Boolean;           //创建完成端口
    //绑定完成端口
    function AssociateSocketWithCompletionPort(socket: TSocket; hCompletionPort: THandle; dwCompletionKey: DWORD): Boolean;

  protected
    procedure AppendMsgLog(pMsg: String);
    function AllocateFreeContextFromPool: TAbstractContext;
    procedure ReleaseClientContextToFreePool(absContext: TAbstractContext);

    procedure AddContextToClientHasTable(absContext: TAbstractContext);
    procedure DelContextToClientHasTable(absContext: TAbstractContext);
  public
    function ListnerStart: Boolean;                  //启动监听
    
    function ChangeSocketModeAccept(m_Socket: TSocket): Boolean;    //设置AcceptEx的Socket为ListenSocket状态。
    function ProcessClientWithContext(m_Socket: TSocket): Boolean;  //生成客户端上下文(投递Recv I/O操作)

    //投递AcceptEx I/O
    function PostWSAAcceptEx: Boolean;
    //投递Recv I/O
    function PostWSARecvProcess(absContext: TAbstractContext): Boolean;   
    //投递CloseSocket消息
    function PostWSACloseSocketProcess(absContext: TAbstractContext): Boolean;

    //处理投递的AcceptEx I/O
    function OnWSAAcceptEx(AbsBuffer: TAbstractBuffer): Boolean;
    //处理投递的Recv I/O
    function OnWSARecv(absContext: TAbstractContext; AbsBuffer: TAbstractBuffer; dwIoSize: DWORD): Boolean;

    function ProcessIOMessage(absContext: TAbstractContext; AbsBuffer: TAbstractBuffer; dwIoSize: DWORD): Boolean;
  public
    function Start( nPort: DWORD;
                    iMaxNumConnections: DWORD;
						        iMaxIOCPIOWorkers: DWORD;
						        iMaxJobWorkers: DWORD;
					 	        iMaxNumberOfFreeContext: DWORD ): Boolean;

    function Startup: Boolean;
    procedure ShutDown();
  public
    constructor Create;
    destructor Destroy; override;
  end;

  TCIOCPWorkerThread = class (TThread)
  protected
    FCIOCPServer: TCIOCPServer;
    procedure Execute; override;
  public
    constructor Create(_CIOCPServer: TCIOCPServer);
    destructor  Destroy; override;
    procedure   Run;
  end;
  
implementation
  uses Unit1;

//TCIOCPWorkerThread
constructor TCIOCPWorkerThread.Create(_CIOCPServer: TCIOCPServer);
begin
  inherited Create(True);
  FCIOCPServer := _CIOCPServer;
  Run;
end;

destructor TCIOCPWorkerThread.Destroy;
begin
  inherited Destroy;
end;

procedure TCIOCPWorkerThread.Run;
begin
  Self.Resume;
end;

procedure TCIOCPWorkerThread.Execute;
var
  dwIOSize: DWORD;
  AbsContext: TAbstractContext;
  AbsBuffer: TAbstractBuffer;
  pHandleData: PPerHandleData;
  bIORet: LongBool;
begin
  while True do
  begin
    AbsContext := nil;
    pHandleData := nil;
    bIORet := GetQueuedCompletionStatus( FCIOCPServer.m_hCompletionPort,
                                          dwIOSize,
                                          DWORD(AbsContext),
                                          POVERLAPPED(pHandleData),
                                          INFINITE);
    if bIORet then
    begin
      //收到退出线程消息
      if (dwIOSize = 0) and (AbsContext = Nil) and (pHandleData = Nil) then
        Exit
      else
      begin
        AbsBuffer := TAbstractBuffer(pHandleData.pPerIOBuf);
        FCIOCPServer.ProcessIOMessage(AbsContext, AbsBuffer, dwIOSize);
      end;
    end
    else //error trapped
    begin
      {
      if Overlapped = Nil then
         //win32 kernel error? just ignore it 8)
      else
        ParseIOError(Transferred, Key, Overlapped);
      }
    end;
    
  end;
end;

//TCIOCPServer
constructor TCIOCPServer.Create;
var
  WSData: TWSAData;
begin
  inherited Create;
  m_iWSAInitResult := WSAStartUp(MAKEWORD(2,2), WSData);

  m_bAcceptConnections := FALSE;		    //是否允许客户端连接
	m_bServerStarted := FALSE;			      //是否启动
	m_bShutDown := FALSE;				          //是否停止服务
  m_bOneIPConnections	:= FALSE;         //是否限制一个IP只允许一个连接
	m_nPort := 8988;						          //默认端口号8988
	m_sListen := INVALID_SOCKET;		      //Socket监听端口

	m_iNumberOfActiveConnections := 0;	  //当前客户端连接个数
	m_iMaxNumConnections := 10000;			  //允许客户端最大连接数
	m_iMaxIOCPIOWorkers := 2;			        //允许最大I/O线程数(双核)
	m_nIOWorkers := 0;					          //当前运行的I/O线程数
	m_iIOCPMaxPostAccepts := 100;		      //最大投递Accept的个数(峰值)
	m_iIOCPInitPostAccepts := 10;		      //首次投递I/OAccept的个数
	m_iIOCPPostAccepts := 0;				      //新连接客户端需投递Accept的数量

  AcceptBuffer := TAbstractBuffer.Create;
  m_FreeContextQueueLock := TCriticalSection.Create;
  FFreeContextQueue := TQueue.Create;
  m_ClientContextTListLock := TCriticalSection.Create;
  FClientContext := THashTable.Create;
  m_LogLock := TCriticalSection.Create;
  {
	m_pListenThread := NULL;				      //侦听线程内核对象
	m_hListenThreadEvent := NULL;		      //侦听线程事件内核对象
	m_hPostAcceptEvent := NULL;			      //投递Accept I/O事件内核对象
	m_hShutdownEvent := NULL;			        //停止服务事件
	m_hCompletionPort := NULL;			      //完成端口内核对象
	m_lpfnAcceptEx := NULL;				        //AcceptEx函数地址
  }
end;

destructor TCIOCPServer.Destroy;
var
  i: Integer;
begin
	if (m_iWSAInitResult <> NO_ERROR) then WSACleanup();
  AcceptBuffer.Free;
  {
  m_FreeContextQueueLock.Free;
  FFreeContextQueue.Free;

  m_ClientContextMapLock.Free;
  FContextQueue.Free;  }

  inherited Destroy;
end;

function TCIOCPServer.Start( nPort: DWORD;        //启动服务
                             iMaxNumConnections: DWORD;
                             iMaxIOCPIOWorkers: DWORD;
                             iMaxJobWorkers: DWORD;
                             iMaxNumberOfFreeContext: DWORD): Boolean;
begin
  if (m_bServerStarted) then begin
    Result := False;
    Exit;
  end;

	m_bShutDown := FALSE;
	m_nPort := nPort;
	m_iMaxNumConnections := iMaxNumConnections;
  m_iMaxIOCPIOWorkers := iMaxIOCPIOWorkers;
  m_iMaxJobWorkers := iMaxJobWorkers;
	m_iMaxNumberOfFreeContext := iMaxNumberOfFreeContext;
  FClientContext.SetSize(m_iMaxNumConnections);  //设置数最

	Result :=  Startup();
end;

function TCIOCPServer.GetHostIPAddr: String;
var
  pHostE: PHostEnt;
  Buffer: array[0..100] of char;
begin
  Result := '127.0.0.1';
  GetHostName(Buffer, Sizeof(Buffer));
  pHostE := GetHostByName(Buffer);
  if pHostE<> nil then
    Result := StrPas(inet_ntoa(PInAddr(pHostE^.h_addr_list^)^));
end;

procedure TCIOCPServer.AppendMsgLog(pMsg: String);
begin
  m_LogLock.Lock;
  Form1.ListBox1.Items.Add(pMsg);
  m_LogLock.UnLock;
end;

function TCIOCPServer.CreateCompletionPort: Boolean;
var
  s: TSocket;
begin
  s := Winsock2.socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
  if (s = Winsock2.INVALID_SOCKET) then begin
    AppendMsgLog(Format('创建Socket错误: %d.', [WSAGetLastError()]));
		Result := FALSE;
    Exit;
  end;

  m_hCompletionPort := CreateIOCompletionPort(s, 0, 0, 0);
  if  (m_hCompletionPort = 0) then begin
    AppendMsgLog(Format('创建CreateIoCompletionPort错误: %d.', [WSAGetLastError()]));
		Result := FALSE;
    Exit;
  end;
  
  Winsock2.closesocket(s);
  Result := TRUE;
end;

function TCIOCPServer.AssociateSocketWithCompletionPort(socket: TSocket; hCompletionPort: THandle; dwCompletionKey: DWORD): Boolean;
var
  h: THandle;
begin
  h := CreateIOCompletionPort(THandle(socket), hCompletionPort, dwCompletionKey, 0);
  Result := h = hCompletionPort;
end;

function TCIOCPServer.PostWSAAcceptEx: Boolean;
var
  FAcceptSocket: TSocket;
  FAcceptReceived:  Cardinal;
  nRetVal: LongBool;
begin
  FAcceptSocket := Winsock2.WSASocketA(AF_INET, SOCK_STREAM, 0, Nil, 0, WSA_FLAG_OVERLAPPED);

  if FAcceptSocket = INVALID_SOCKET then begin
    AppendMsgLog(Format('PostAcceptProcess创建监听套按字失败: %d.', [WSAGetLastError()]));
    Result := FALSE;
    Exit;
  end;

  //设置标志
  AcceptBuffer.SetOperation(IOWSAAcceptEx);
  AcceptBuffer.SetupRead;
  AcceptBuffer.FSocket := FAcceptSocket;

  nRetVal := AcceptEx( m_sListen,
                       FAcceptSocket,
                       @AcceptBuffer.FPerHandleData.Buffer,
                       0,
                       sizeof(TSockAddrIn)+16,
                       sizeof(TSockAddrIn)+16,
                       FAcceptReceived,
                       POverlapped(@AcceptBuffer.FPerHandleData));

  if( (nRetVal=FALSE) and (WSAGetLastError()<>WSA_IO_PENDING)) then
  begin
    AppendMsgLog(Format('PostAcceptProcess投递Accept I/O操作失败: %d.', [WSAGetLastError()]));
    Result := FALSE;
    Exit;
  end;

  Result := TRUE;
end;

function TCIOCPServer.ListnerStart: Boolean;         //启动监听
var
  bVal: LongBool;
  nRet: Integer;
  sock_Addr: TSockAddrIn;
begin
  //创建socket
  m_sListen := Winsock2.WSASocket(AF_INET, SOCK_STREAM, 0, Nil, 0, WSA_FLAG_OVERLAPPED);
  if m_sListen = INVALID_SOCKET then begin
    AppendMsgLog(Format('创建 Socket监听套按字失败: %d.', [WSAGetLastError()]));
    Result := FALSE;
    Exit;
  end;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -