📄 uciocpserver.pas
字号:
unit uCIOCPServer;
interface
uses
Windows, Classes, Sysutils, Contnrs, WinSock2, uCriticalSection, uHashTable,
uDIProtocol, uIOCPRTL, uCIOCPBuffer;
type
TCIOCPServer = class;
//线程结构
PThreadParam = ^ThreadParam; ThreadParam = packed record nThread: DWORD; //索引 pCIOCPServer: TCIOCPServer; //指向IOCPServer的指针 hEvent: Thandle; //事件 end;
//心跳结构
PHeartBeatParam = ^HeartBeatParam;
HeartBeatParam = packed record
nHeartBeatCount: DWORD; //次数 nBeginTime: DWORD; //时间 end;
TCIOCPServer = Class
private
AcceptBuffer: TAbstractBuffer; //AcceptEX 缓冲
m_ThreadParams: array of ThreadParam; //工作者线程数组
private
m_nPort: DWORD; //侦听端口号
m_sListen: TSocket; //Socket监听端口
m_iWSAInitResult: Integer; //加载WinSock库返回值
m_bServerStarted: Boolean; //服务器是否启动
m_bAcceptConnections: Boolean; //是否允许客户端连接
m_bShutDown: Boolean; //是否停止服务
m_bOneIPConnections: Boolean; //是否限制一个IP只允许一个连接
m_iNumberOfActiveConnections: DWORD; //当前客户端连接个数
m_iMaxNumConnections: DWORD; //允许最多客户端连接个数
m_nIOWorkers: DWORD; //当前运行的I/O线程数
m_iMaxIOCPIOWorkers: DWORD; //允许运行的I/O线程数
m_iMaxJobWorkers: DWORD; //最大工作线程数据
m_iMaxNumberOfFreeContext: DWORD; //空闲上下文池允许的最大数
m_iIOCPMaxPostAccepts: DWORD; //最大投递Accept的个数(峰值)
m_iIOCPInitPostAccepts: DWORD; //初次投递Accept的个数
m_iIOCPPostAccepts: DWORD; //客户端连接后需投递Accept的数量
private
m_FreeContextQueueLock: TCriticalSection; //客户端Context空闲池锁
FFreeContextQueue: TQueue; //队列先进先出Context空闲池
m_ClientContextTListLock: TCriticalSection; //客户端Context队列锁
FClientContext: THashTable; //客户端哈索表
m_LogLock: TCriticalSection; //日志锁
private
m_hShutdownEvent: Thandle; //停止服务事件内核对象
m_hCompletionPort: Thandle; //完成端口内核对象
private
function GetHostIPAddr: String;
function CreateCompletionPort: Boolean; //创建完成端口
//绑定完成端口
function AssociateSocketWithCompletionPort(socket: TSocket; hCompletionPort: THandle; dwCompletionKey: DWORD): Boolean;
protected
procedure AppendMsgLog(pMsg: String);
function AllocateFreeContextFromPool: TAbstractContext;
procedure ReleaseClientContextToFreePool(absContext: TAbstractContext);
procedure AddContextToClientHasTable(absContext: TAbstractContext);
procedure DelContextToClientHasTable(absContext: TAbstractContext);
public
function ListnerStart: Boolean; //启动监听
function ChangeSocketModeAccept(m_Socket: TSocket): Boolean; //设置AcceptEx的Socket为ListenSocket状态。
function ProcessClientWithContext(m_Socket: TSocket): Boolean; //生成客户端上下文(投递Recv I/O操作)
//投递AcceptEx I/O
function PostWSAAcceptEx: Boolean;
//投递Recv I/O
function PostWSARecvProcess(absContext: TAbstractContext): Boolean;
//投递CloseSocket消息
function PostWSACloseSocketProcess(absContext: TAbstractContext): Boolean;
//处理投递的AcceptEx I/O
function OnWSAAcceptEx(AbsBuffer: TAbstractBuffer): Boolean;
//处理投递的Recv I/O
function OnWSARecv(absContext: TAbstractContext; AbsBuffer: TAbstractBuffer; dwIoSize: DWORD): Boolean;
function ProcessIOMessage(absContext: TAbstractContext; AbsBuffer: TAbstractBuffer; dwIoSize: DWORD): Boolean;
public
function Start( nPort: DWORD;
iMaxNumConnections: DWORD;
iMaxIOCPIOWorkers: DWORD;
iMaxJobWorkers: DWORD;
iMaxNumberOfFreeContext: DWORD ): Boolean;
function Startup: Boolean;
procedure ShutDown();
public
constructor Create;
destructor Destroy; override;
end;
TCIOCPWorkerThread = class (TThread)
protected
FCIOCPServer: TCIOCPServer;
procedure Execute; override;
public
constructor Create(_CIOCPServer: TCIOCPServer);
destructor Destroy; override;
procedure Run;
end;
implementation
uses Unit1;
//TCIOCPWorkerThread
constructor TCIOCPWorkerThread.Create(_CIOCPServer: TCIOCPServer);
begin
inherited Create(True);
FCIOCPServer := _CIOCPServer;
Run;
end;
destructor TCIOCPWorkerThread.Destroy;
begin
inherited Destroy;
end;
procedure TCIOCPWorkerThread.Run;
begin
Self.Resume;
end;
procedure TCIOCPWorkerThread.Execute;
var
dwIOSize: DWORD;
AbsContext: TAbstractContext;
AbsBuffer: TAbstractBuffer;
pHandleData: PPerHandleData;
bIORet: LongBool;
begin
while True do
begin
AbsContext := nil;
pHandleData := nil;
bIORet := GetQueuedCompletionStatus( FCIOCPServer.m_hCompletionPort,
dwIOSize,
DWORD(AbsContext),
POVERLAPPED(pHandleData),
INFINITE);
if bIORet then
begin
//收到退出线程消息
if (dwIOSize = 0) and (AbsContext = Nil) and (pHandleData = Nil) then
Exit
else
begin
AbsBuffer := TAbstractBuffer(pHandleData.pPerIOBuf);
FCIOCPServer.ProcessIOMessage(AbsContext, AbsBuffer, dwIOSize);
end;
end
else //error trapped
begin
{
if Overlapped = Nil then
//win32 kernel error? just ignore it 8)
else
ParseIOError(Transferred, Key, Overlapped);
}
end;
end;
end;
//TCIOCPServer
constructor TCIOCPServer.Create;
var
WSData: TWSAData;
begin
inherited Create;
m_iWSAInitResult := WSAStartUp(MAKEWORD(2,2), WSData);
m_bAcceptConnections := FALSE; //是否允许客户端连接
m_bServerStarted := FALSE; //是否启动
m_bShutDown := FALSE; //是否停止服务
m_bOneIPConnections := FALSE; //是否限制一个IP只允许一个连接
m_nPort := 8988; //默认端口号8988
m_sListen := INVALID_SOCKET; //Socket监听端口
m_iNumberOfActiveConnections := 0; //当前客户端连接个数
m_iMaxNumConnections := 10000; //允许客户端最大连接数
m_iMaxIOCPIOWorkers := 2; //允许最大I/O线程数(双核)
m_nIOWorkers := 0; //当前运行的I/O线程数
m_iIOCPMaxPostAccepts := 100; //最大投递Accept的个数(峰值)
m_iIOCPInitPostAccepts := 10; //首次投递I/OAccept的个数
m_iIOCPPostAccepts := 0; //新连接客户端需投递Accept的数量
AcceptBuffer := TAbstractBuffer.Create;
m_FreeContextQueueLock := TCriticalSection.Create;
FFreeContextQueue := TQueue.Create;
m_ClientContextTListLock := TCriticalSection.Create;
FClientContext := THashTable.Create;
m_LogLock := TCriticalSection.Create;
{
m_pListenThread := NULL; //侦听线程内核对象
m_hListenThreadEvent := NULL; //侦听线程事件内核对象
m_hPostAcceptEvent := NULL; //投递Accept I/O事件内核对象
m_hShutdownEvent := NULL; //停止服务事件
m_hCompletionPort := NULL; //完成端口内核对象
m_lpfnAcceptEx := NULL; //AcceptEx函数地址
}
end;
destructor TCIOCPServer.Destroy;
var
i: Integer;
begin
if (m_iWSAInitResult <> NO_ERROR) then WSACleanup();
AcceptBuffer.Free;
{
m_FreeContextQueueLock.Free;
FFreeContextQueue.Free;
m_ClientContextMapLock.Free;
FContextQueue.Free; }
inherited Destroy;
end;
function TCIOCPServer.Start( nPort: DWORD; //启动服务
iMaxNumConnections: DWORD;
iMaxIOCPIOWorkers: DWORD;
iMaxJobWorkers: DWORD;
iMaxNumberOfFreeContext: DWORD): Boolean;
begin
if (m_bServerStarted) then begin
Result := False;
Exit;
end;
m_bShutDown := FALSE;
m_nPort := nPort;
m_iMaxNumConnections := iMaxNumConnections;
m_iMaxIOCPIOWorkers := iMaxIOCPIOWorkers;
m_iMaxJobWorkers := iMaxJobWorkers;
m_iMaxNumberOfFreeContext := iMaxNumberOfFreeContext;
FClientContext.SetSize(m_iMaxNumConnections); //设置数最
Result := Startup();
end;
function TCIOCPServer.GetHostIPAddr: String;
var
pHostE: PHostEnt;
Buffer: array[0..100] of char;
begin
Result := '127.0.0.1';
GetHostName(Buffer, Sizeof(Buffer));
pHostE := GetHostByName(Buffer);
if pHostE<> nil then
Result := StrPas(inet_ntoa(PInAddr(pHostE^.h_addr_list^)^));
end;
procedure TCIOCPServer.AppendMsgLog(pMsg: String);
begin
m_LogLock.Lock;
Form1.ListBox1.Items.Add(pMsg);
m_LogLock.UnLock;
end;
function TCIOCPServer.CreateCompletionPort: Boolean;
var
s: TSocket;
begin
s := Winsock2.socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
if (s = Winsock2.INVALID_SOCKET) then begin
AppendMsgLog(Format('创建Socket错误: %d.', [WSAGetLastError()]));
Result := FALSE;
Exit;
end;
m_hCompletionPort := CreateIOCompletionPort(s, 0, 0, 0);
if (m_hCompletionPort = 0) then begin
AppendMsgLog(Format('创建CreateIoCompletionPort错误: %d.', [WSAGetLastError()]));
Result := FALSE;
Exit;
end;
Winsock2.closesocket(s);
Result := TRUE;
end;
function TCIOCPServer.AssociateSocketWithCompletionPort(socket: TSocket; hCompletionPort: THandle; dwCompletionKey: DWORD): Boolean;
var
h: THandle;
begin
h := CreateIOCompletionPort(THandle(socket), hCompletionPort, dwCompletionKey, 0);
Result := h = hCompletionPort;
end;
function TCIOCPServer.PostWSAAcceptEx: Boolean;
var
FAcceptSocket: TSocket;
FAcceptReceived: Cardinal;
nRetVal: LongBool;
begin
FAcceptSocket := Winsock2.WSASocketA(AF_INET, SOCK_STREAM, 0, Nil, 0, WSA_FLAG_OVERLAPPED);
if FAcceptSocket = INVALID_SOCKET then begin
AppendMsgLog(Format('PostAcceptProcess创建监听套按字失败: %d.', [WSAGetLastError()]));
Result := FALSE;
Exit;
end;
//设置标志
AcceptBuffer.SetOperation(IOWSAAcceptEx);
AcceptBuffer.SetupRead;
AcceptBuffer.FSocket := FAcceptSocket;
nRetVal := AcceptEx( m_sListen,
FAcceptSocket,
@AcceptBuffer.FPerHandleData.Buffer,
0,
sizeof(TSockAddrIn)+16,
sizeof(TSockAddrIn)+16,
FAcceptReceived,
POverlapped(@AcceptBuffer.FPerHandleData));
if( (nRetVal=FALSE) and (WSAGetLastError()<>WSA_IO_PENDING)) then
begin
AppendMsgLog(Format('PostAcceptProcess投递Accept I/O操作失败: %d.', [WSAGetLastError()]));
Result := FALSE;
Exit;
end;
Result := TRUE;
end;
function TCIOCPServer.ListnerStart: Boolean; //启动监听
var
bVal: LongBool;
nRet: Integer;
sock_Addr: TSockAddrIn;
begin
//创建socket
m_sListen := Winsock2.WSASocket(AF_INET, SOCK_STREAM, 0, Nil, 0, WSA_FLAG_OVERLAPPED);
if m_sListen = INVALID_SOCKET then begin
AppendMsgLog(Format('创建 Socket监听套按字失败: %d.', [WSAGetLastError()]));
Result := FALSE;
Exit;
end;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -