📄 iocpsocket.pas
字号:
{****************************************************
IOCP Socket Server
主要参考自CodeProject下载源码,
特别感谢张无忌的帮助
这个版本还很粗糙,很多地方没有处理好,最近
比较忙,没有精力和时间来完善它,大家如果感
兴趣,可以接着完善,或者干脆重写。
如果是从这份代码改写,写完了别忘了发给我一份
wjjxm@etang.com
write by: softdog
2003.2
****************************************************}
unit IOCPSocket;
interface
uses
SysUtils, Windows, WinSock2, SyncObjs, Classes;
const
BUFFER_SIZE = 2048;
type
TSocketStatus = (ssAccept, ssSend, ssRecv, ssClose, ssStop);
PPerHandleData = ^TPerHandleData;
TPerHandleData = record
Overlapped: OVERLAPPED;
wsaBuffer: WSABUF;
Statu: TSocketStatus;
Socket: TSocket;
Buffer: array[0..BUFFER_SIZE - 1] of char;
end;
TIOCPProcThread = class;
TIOCPServer = class;
TErrorLogProc = procedure(Msg: string; param: array of const) of object;
TOnClientConnect = procedure(NewSocket: TSocket) of object;
TOnClientDisConnect = procedure(ASocket: TSocket) of object;
TOnDataRecive = procedure(IOData: PPerHandleData; var OperPosted: boolean) of object;
TOnDataSend = procedure(IOData: PPerHandleData; var OperPosted: boolean) of object;
//服务端Socket类
TIOCPServer = class(TComponent)
private
FActive: boolean;
FLocalAddress: string;
FLocalPort: Word;
FListenPort: THandle;
FListenSocket: TSocket;
FAcceptThreads: word;
FAcceptThreadPool: TList;
FProcPool: TList;
FClientList: TStringList;
FThreadCacheSize: Word;
FErrorLogProc: TErrorLogProc;
FLogProc: TErrorLogProc;
FOnClientConnect: TOnClientConnect;
FOnClientDisConnect: TOnClientDisConnect;
FOnDataRecive: TOnDataRecive;
FOnDataSend: TOnDataSend;
FCount: integer;
procedure SetActive(const Value: boolean);
procedure SetLocalAddress(const Value: string);
procedure SetLocalPort(const Value: word);
procedure SetThreadCacheSize(const Value: Word);
protected
procedure ErrorLog(Msg: string; param: array of const);
procedure Log(Msg: string; param: array of const);
procedure DoClientClose(SocketInfo: TSocket);
public
constructor Create(Owner: TComponent); override;
destructor Destroy; override;
function Start: boolean;
function Stop: boolean;
function SendData(IOData: TPerHandleData): integer;
function PostRead(SocketInfo: TSocket): boolean;
published
property Active: boolean read FActive write SetActive;
property LocalAddress: string read FLocalAddress write SetLocalAddress;
property LocalPort: word read FLocalPort write SetLocalPort;
property AcceptThreads: word read FAcceptThreads write FAcceptThreads;
property ThreadCacheSize: Word read FThreadCacheSize write SetThreadCacheSize;
property OnErrorLog: TErrorLogProc read FErrorLogProc write FErrorLogProc;
property OnLog: TErrorLogProc read FLogProc write FLogProc;
property OnClientConnect: TOnClientConnect read FOnClientConnect write FOnClientConnect;
property OnClientDisConnect: TOnClientDisConnect read FOnClientDisConnect write FOnClientDisConnect;
property OnDataRecive: TOnDataRecive read FOnDataRecive write FOnDataRecive;
property OnDataSend: TOnDataSend read FOnDataSend write FOnDataSend;
end;
//监听线程类
TIOCPAcceptThread = class(TThread)
private
FServerComm: TIOCPServer;
FLogMsg: string;
protected
procedure Log;
procedure Errlog;
procedure Execute; override;
public
constructor Create(AServerComm: TIOCPServer);
end;
//服务端处理线程
TIOCPProcThread = class(TThread)
private
FServerComm: TIOCPServer;
FLogMsg: string;
protected
procedure Log;
procedure Errlog;
procedure Execute; override;
public
constructor Create(AServerComm: TIOCPServer);
destructor Destroy; override;
property ServerComm: TIOCPServer read FServerComm;
end;
procedure InitSocket;
procedure DoneSocket;
procedure Register;
var
GlobalLock: TCriticalSection;
implementation
var
bCanLog: boolean;
procedure Register;
begin
RegisterComponents('IOCP', [TIOCPServer]);
end;
procedure InitSocket;
var
wsaData: TWSAData;
begin
WSAStartup(makeword(2, 0), wsaData);
end;
procedure DoneSocket;
begin
WSACleanup;
end;
{ TIOCPServer }
constructor TIOCPServer.Create(Owner: TComponent);
begin
inherited Create(Owner);
FAcceptThreads := 5;
FThreadCacheSize := 5;
FAcceptThreadPool := TList.Create;
FClientList := TStringList.Create;
FProcPool := TList.Create;
end;
destructor TIOCPServer.Destroy;
begin
bCanLog := false;
Stop;
FAcceptThreadPool.Free;
FClientList.Free;
FProcPool.Free;
inherited;
end;
procedure TIOCPServer.DoClientClose(SocketInfo: TSocket);
var
idx: integer;
begin
idx := FClientList.IndexOf(inttostr(SocketInfo));
if idx >= 0 then
begin
closesocket(SocketInfo);
FClientList.Delete(idx);
end;
if Assigned(FOnClientDisConnect) then
FOnClientDisConnect(SocketInfo);
end;
procedure TIOCPServer.ErrorLog(Msg: string; param: array of const);
begin
if Assigned(FErrorLogProc) and bCanLog then
FErrorLogProc(Msg, Param);
end;
procedure TIOCPServer.Log(Msg: string; param: array of const);
begin
if Assigned(FLogProc) and bCanLog then
FLogProc(Msg, Param);
end;
function TIOCPServer.PostRead(SocketInfo: TSocket): boolean;
var
byteRecv, Flags: DWORD;
HandleData: PPerHandleData;
begin
Flags := 0;
New(HandleData);
FillChar(HandleData.Overlapped, Sizeof(Overlapped), 0);
FillChar(HandleData.Buffer, BUFFER_SIZE, 0);
HandleData.wsaBuffer.buf := HandleData.Buffer;
HandleData.wsaBuffer.len := BUFFER_SIZE;
HandleData.Statu := ssRecv;
HandleData.Socket := SocketInfo;
Result := (WSARecv(SocketInfo, @(HandleData.wsaBuffer), 1,
byteRecv,
Flags,
@HandleData.Overlapped,
nil)
<> SOCKET_ERROR);
end;
function TIOCPServer.SendData(IOData: TPerHandleData): integer;
var
byteSend, Flags: DWORD;
NewData: PPerHandleData;
begin
new(NewData);
NewData^ := IOData;
Flags := 0;
FillChar(NewData^.Overlapped, Sizeof(Overlapped), 0);
NewData^.Statu := ssSend;
if WSASend(NewData^.Socket, @(NewData^.wsaBuffer), 1,
byteSend,
Flags,
@(NewData^.Overlapped),
nil) = SOCKET_ERROR then
if WSAGetLastError <> ERROR_IO_PENDING then
begin
Result := -1;
exit;
end;
Result := byteSend;
end;
procedure TIOCPServer.SetActive(const Value: boolean);
begin
if FActive = Value then
exit;
if Value then
Start
else
Stop;
FActive := Value;
end;
procedure TIOCPServer.SetLocalAddress(const Value: string);
begin
if FActive then
begin
ErrorLog('改变参数前必须停止监听!', []);
exit;
end;
FLocalAddress := Value;
end;
procedure TIOCPServer.SetLocalPort(const Value: word);
begin
if FActive then
begin
ErrorLog('改变参数前必须停止监听!', []);
exit;
end;
FLocalPort := Value;
end;
procedure TIOCPServer.SetThreadCacheSize(const Value: Word);
begin
FThreadCacheSize := Value;
if FProcPool = nil then
FProcPool := TList.Create;
end;
function TIOCPServer.Start: boolean;
var
addr: TSockAddr;
i: integer;
begin
if FActive then
begin
ErrorLog('服务已经启动!', []);
result := false;
exit;
end;
FListenPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
//创建IOCP处理线程
for i := 1 to FThreadCacheSize do
begin
FProcPool.Add(TIOCPProcThread.Create(self));
inc(FCount);
end;
//创建Overlapped Socket
FListenSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);
if FListenSocket = INVALID_SOCKET then
begin
ErrorLog('创建套接字错误:%d', [WSAGetLastError]);
Result := false;
exit;
end;
addr.sin_family := AF_INET;
addr.sin_port := htons(FLocalPort);
addr.sin_addr.S_addr := INADDR_ANY;
if Winsock2.bind(FListenSocket, @addr, sizeof(addr)) <> 0 then
begin
ErrorLog('绑定套接字错误:%d', [WSAGetLastError]);
Result := false;
exit;
end;
if Winsock2.listen(FListenSocket, 5) <> 0 then
begin
ErrorLog('监听套接字错误:%d', [WSAGetLastError]);
Result := false;
exit;
end;
//创建监听线程
for i := 1 to AcceptThreads do
FAcceptThreadPool.Add(TIOCPAcceptThread.Create(self));
Result := true;
FActive := true;
Log('服务已经启动在端口: %d', [LocalPort]);
end;
function TIOCPServer.Stop: boolean;
var
i: integer;
Closelapped: PPerHandleData;
begin
if not FActive then
begin
result := false;
exit;
end;
//关闭监听线程
for i := FAcceptThreadPool.Count downto 1 do
begin
TIOCPAcceptThread(FAcceptThreadPool.Items[i - 1]).Terminate;
FAcceptThreadPool.Delete(i - 1);
end;
//关闭监听Socket
if FListenSocket <> INVALID_SOCKET then
begin
closesocket(FListenSocket);
FListenSocket := INVALID_SOCKET;
end;
//断开所有客户端连接
for i := FClientList.Count - 1 downto 0 do
DoClientClose(strtoint(FClientList.Strings[i]));
//发送关闭处理线程信号
for i := 1 to FProcPool.Count do
begin
New(Closelapped);
Closelapped.Statu := ssStop;
PostQueuedCompletionStatus(FListenPort, 1, 0, @(Closelapped.Overlapped));
end;
FProcPool.Clear;
//关闭完成端口句柄
if FListenPort <> 0 then
CloseHandle(FListenPort);
Result := true;
FActive := false;
Log('服务已经停止.', []);
end;
{ TIOCPAcceptThread }
constructor TIOCPAcceptThread.Create(AServerComm: TIOCPServer);
begin
inherited Create(true);
FServerComm := AServerComm;
FreeOnTerminate := true;
Resume;
end;
procedure TIOCPAcceptThread.Errlog;
begin
FServerComm.ErrorLog(FLogMsg, []);
end;
procedure TIOCPAcceptThread.Execute;
var
Addr: TSockAddr;
AddrLen: integer;
sNew: TSocket;
NewPort: THandle;
begin
AddrLen := SizeOf(Addr);
while not Terminated do
begin
sNew := Winsock2.accept(FServerComm.FListenSocket, Addr, AddrLen);
if sNew <> INVALID_SOCKET then
begin
GlobalLock.Enter;
//绑定完成端口
NewPort := CreateIoCompletionPort(sNew, FServerComm.FListenPort, 0, 0);
if NewPort = 0 then
Continue
else
begin
FServerComm.FClientList.Add(inttostr(sNew));
//发送读取请求
FServerComm.PostRead(sNew);
if Assigned(FServerComm.OnClientConnect) then
FServerComm.OnClientConnect(sNew);
FLogMsg := Format('客户端连接:%d', [sNew]);
Synchronize(Log);
GlobalLock.Leave;
end;
end else
if WSAGetLastError = WSAEINTR then
begin
FLogMsg := '监听线程终止.';
Synchronize(Log);
Terminate;
end;
end;
end;
procedure TIOCPAcceptThread.Log;
begin
FServerComm.Log(FLogMsg, []);
end;
{ TIOCPProcThread }
constructor TIOCPProcThread.Create(AServerComm: TIOCPServer);
begin
FServerComm := AServerComm;
FreeOnTerminate := true;
inherited Create(false);
end;
destructor TIOCPProcThread.Destroy;
begin
inherited;
end;
procedure TIOCPProcThread.Errlog;
begin
FServerComm.ErrorLog(FLogMsg, []);
end;
procedure TIOCPProcThread.Execute;
var
HandleData: PPerHandleData;
byteRece, Key: DWORD;
OperPosted: boolean;
errCode: integer;
begin
while not Terminated do
begin
if not GetQueuedCompletionStatus(FServerComm.FListenPort, byteRece, Key, POverlapped(HandleData), INFINITE) then
begin
errCode := GetLastError;
if errcode = 6 then
Terminate;
Continue;
end;
GlobalLock.Enter;
//客户端断开连接
if byteRece = 0 then
begin
FLogMsg := Format('客户端断开连接:%d', [HandleData.Socket]);
Synchronize(Log);
FServerComm.DoClientClose(HandleData.Socket);
Dispose(HandleData);
GlobalLock.Leave;
Continue;
end;
case HandleData.Statu of
ssRecv:
begin
OperPosted := false;
if Assigned(FServerComm.FOnDataRecive) then
FServerComm.FOnDataRecive(HandleData, OperPosted);
if not OperPosted then
FServerComm.PostRead(HandleData.Socket);
end;
ssSend:
begin
OperPosted := false;
if Assigned(FServerComm.FOnDataSend) then
FServerComm.FOnDataSend(HandleData, OperPosted);
if not OperPosted then
FServerComm.PostRead(HandleData.Socket);
end;
ssClose:
begin
FServerComm.DoClientClose(HandleData.Socket);
end;
ssStop:
begin
Dec(FServerComm.FCount);
Terminate;
end;
end; // Case
Dispose(HandleData);
GlobalLock.Leave;
end;
end;
procedure TIOCPProcThread.Log;
begin
FServerComm.Log(FLogMsg, []);
end;
initialization
InitSocket;
GlobalLock := TCriticalSection.Create;
bCanLog := true;
finalization
DoneSocket;
GlobalLock.Free;
end.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -