📄 iocpcomp.pas
字号:
closesocket(ClientWinSocket);
ErrCode := GetLastError;
Error(eeAccept, ErrCode);
Exit;
end;
Ret := CreateIoCompletionPort(ClientWinSocket, ACompletionPort, DWORD(ClientSocket), 0);
if Ret = 0 then
ClientSocket.Free;
end;
end;
constructor TServerSocket.Create;
begin
inherited Create(INVALID_SOCKET);
FBuffer := TMemoryBuffer.Create(Self);
FClients := TList.Create;
FThreads := TList.Create;
FPort := 211;
FAcceptThread := 0;
FCompletionPort := 0;
IsMultiThread := True;
FHandle := Classes.AllocateHWnd(WndProc);
end;
destructor TServerSocket.Destroy;
begin
SetActive(False);
FThreads.Free;
FClients.Free;
Classes.DeallocateHWnd(FHandle);
FBuffer.Free;
inherited Destroy;
end;
function TServerSocket.FindClientSocket(ASocket: TSocket): TCustomSocket;
var
I: Integer;
begin
Lock;
try
for I := 0 to FClients.Count - 1 do
begin
Result := FClients[I];
if ASocket = Result.SocketHandle then Exit;
end;
Result := nil;
finally
UnLock;
end;
end;
function TServerSocket.GetClientCount: Integer;
begin
Result := FClients.Count;
end;
function TServerSocket.GetClients(const Index: Integer): TServerClientSocket;
begin
Result := FClients[Index];
end;
procedure TServerSocket.InternalClose;
procedure CloseObject(var Handle: THandle; ServiceThread: Boolean = False);
begin
if Handle <> 0 then
begin
if ServiceThread then
PostQueuedCompletionStatus(FCompletionPort, 0, 0, Pointer(SHUTDOWN_FLAG));
CloseHandle(Handle);
Handle := 0;
end;
end;
var
I: Integer;
Handle: THandle;
begin
Lock;
try
for I := FThreads.Count - 1 downto 0 do
begin
Handle := THandle(FThreads[I]);
CloseObject(Handle, True);
end;
FThreads.Clear;
for I := FClients.Count - 1 downto 0 do
TObject(FClients[I]).Free;
FClients.Clear;
if FSocket <> INVALID_SOCKET then
begin
Event(seDisconnect);
closesocket(FSocket);
FSocket := INVALID_SOCKET;
end;
CloseObject(FAcceptThread);
CloseObject(FCompletionPort);
finally
UnLock;
end;
end;
function WorkerThreadProc(AServer: TServerSocket): DWORD; stdcall; forward;
procedure TServerSocket.InternalOpen;
var
I: Integer;
Handle: THandle;
ThreadID: DWORD;
SystemInfo: TSystemInfo;
begin
Lock;
try
InternalClose;
try
FCompletionPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
if FCompletionPort = 0 then
raise ESocketError.Create(SysErrorMessage(GetLastError));
Event(seInitIOPort);
GetSystemInfo(SystemInfo);
for I := 0 to SystemInfo.dwNumberOfProcessors * 2 - 1 do
begin
Handle := CreateThread(nil, 0, @WorkerThreadProc, Self, 0, ThreadID);
if Handle = 0 then
raise ESocketError.Create(SysErrorMessage(GetLastError));
FThreads.Add(Pointer(Handle));
end;
FSocket := WSASocket(PF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);
if FSocket = INVALID_SOCKET then
raise ESocketError.Create(SysErrorMessage(GetLastError));
Event(seInitSocket);
FillChar(FAddr, SizeOf(FAddr), 0);
FAddr.sin_family := AF_INET;
FAddr.sin_port := htons(FPort);
FAddr.sin_addr.S_addr := INADDR_ANY;
CheckError(bind(FSocket, @FAddr, SizeOf(FAddr)), 'bind');
Event(seListen);
CheckError(listen(FSocket, SOMAXCONN), 'listen');
FAcceptThread := CreateThread(nil, 0, @AcceptThreadProc, Self, 0, ThreadID);
if FAcceptThread = 0 then
raise ESocketError.Create(SysErrorMessage(GetLastError));
except
InternalClose;
raise;
end;
finally
UnLock;
end;
end;
function TServerSocket.IsAccept(Socket: TSocket): Boolean;
begin
Result := True;
end;
procedure TServerSocket.RegisterClient(ASocket: TCustomSocket);
begin
Lock;
try
if FClients.IndexOf(ASocket) = -1 then
begin
FClients.Add(ASocket);
WSAAsyncSelect(ASocket.SocketHandle, FHandle, WM_CLIENTSOCKET, FD_CLOSE);
end;
finally
UnLock;
end;
end;
procedure TServerSocket.RemoveClient(ASocket: TCustomSocket);
var
Index: Integer;
begin
Lock;
try
Index := FClients.IndexOf(ASOcket);
if Index <> -1 then
FClients.Delete(Index);
finally
UnLock;
end;
end;
procedure TServerSocket.SetActive(Value: Boolean);
begin
if FActive = Value then Exit;
if Value then
InternalOpen
else
InternalClose;
FActive := Value;
end;
procedure TServerSocket.SetPort(Value: Integer);
begin
if Active then
raise ESocketError.Create('Cann''t change port');
FPort := Value;
end;
procedure TServerSocket.WMClientClose(var Message: TCMSocketMessage);
var
ClientSocket: TCustomSocket;
begin
ClientSocket := FindClientSocket(Message.Socket);
if Assigned(ClientSocket) then
ClientSocket.Free;
end;
procedure TServerSocket.WndProc(var Message: TMessage);
begin
try
Dispatch(Message);
except
if Assigned(ApplicationHandleException) then
ApplicationHandleException(Self);
end;
end;
{ TServerClientSocket }
constructor TServerClientSocket.Create(AServerSocket: TServerSocket;
ASocket: TSocket);
begin
inherited Create(ASocket);
FServerSocket := AServerSocket;
FBuffer := FServerSocket.FBuffer;
FBlock := TList.Create;
FServerSocket.RegisterClient(Self);
FOnRead := FServerSocket.OnClientRead;
OnErrorEvent := FServerSocket.ClientSocketError;
OnEventEvent := FServerSocket.ClientSocketEvent;
PrepareRecv;
Event(seConnect);
end;
destructor TServerClientSocket.Destroy;
var
I: Integer;
begin
FServerSocket.RemoveClient(Self);
for I := FBlock.Count - 1 downto 0 do
FBuffer.RemoveBlock(FBlock[I]);
FBlock.Free;
inherited Destroy;
end;
procedure TServerClientSocket.SetActive(Value: Boolean);
var
Linger: TLinger;
begin
if FActive = Value then Exit;
if not Value then
begin
if FSocket <> INVALID_SOCKET then
begin
Event(seDisconnect);
FillChar(Linger, SizeOf(Linger), 0);
setsockopt(FSocket, SOL_SOCKET, SO_LINGER, @Linger, Sizeof(Linger));
closesocket(FSocket);
FSocket := INVALID_SOCKET;
end;
end else
raise ESocketError.Create('当前socket不支持连接操作');
FActive := Value;
end;
function TServerClientSocket.AllocBlock: PBlock;
var
I: Integer;
begin
for I := 0 to FBlock.Count - 1 do
begin
Result := FBlock[I];
if not Result.Data.IsUse then
begin
Result.Data.IsUse := True;
Exit;
end;
end;
Result := FBuffer.AllocBlock;
FBlock.Add(Result);
Result.Data.IsUse := True;
end;
function TServerClientSocket.Read(var Buf; Count: Integer): Integer;
begin
{ 读操作由DoReceive触发OnRead进行读 }
raise ESocketError.Create('读操作错误');
end;
function TServerClientSocket.Write(var Buf; Count: Integer): Integer;
var
Block: PBlock;
ErrCode: Integer;
Flags, BytesSend: Cardinal;
begin
Result := Count;
if Result = 0 then Exit;
Block := AllocBlock;
with Block^.Data do
begin
Flags := 0;
Event := seWrite;
wsaBuffer.buf := @Buf;
wsaBuffer.len := Result;
if SOCKET_ERROR = WSASend(FSocket, @wsaBuffer, 1, BytesSend, Flags, @Overlapped, nil) then
begin
ErrCode := WSAGetLastError;
if ErrCode <> ERROR_IO_PENDING then
begin
Result := SOCKET_ERROR;
Error(eeSend, ErrCode);
end;
end;
end;
end;
function TServerClientSocket.PrepareRecv(Block: PBlock = nil): Boolean;
var
ErrCode: Integer;
Flags, Transfer: Cardinal;
begin
if not Assigned(Block) then
Block := AllocBlock;
with Block^.Data do
begin
Flags := 0;
Event := seRead;
FillChar(Buffer, SizeOf(Buffer), 0);
FillChar(Overlapped, SizeOf(Overlapped), 0);
wsaBuffer.buf := Buffer;
wsaBuffer.len := MAX_BUFSIZE;
Result := SOCKET_ERROR <> WSARecv(FSocket, @wsaBuffer, 1, Transfer, Flags, @Overlapped, nil);
if not Result then
begin
ErrCode := WSAGetLastError;
Result := ErrCode = ERROR_IO_PENDING;
if not Result then
begin
Block.Data.IsUse := False;
Error(eeReceive, ErrCode);
end;
end;
end;
end;
const
RESPONSE_UNKNOWN = $0001;
RESPONSE_SUCCESS = $0002;
RESPONSE_FAIL = $FFFF;
function TServerClientSocket.WorkBlock(var Block: PBlock; Transfered: DWORD): DWORD;
var
ErrCode: Integer;
Flag, BytesSend: Cardinal;
begin
Result := RESPONSE_SUCCESS;
with Block^.Data do
try
case Block^.Data.Event of
seRead:
begin
Self.Event(seRead);
DoRead(@Buffer, Transfered);
if not PrepareRecv(Block) then
Result := RESPONSE_FAIL;
end;
seWrite:
begin
Self.Event(seWrite);
Dec(wsaBuffer.len, Transfered);
if wsaBuffer.len <= 0 then
begin
{ 发送完成,将Block置空,返回到FBlock的可使用的缓区中 }
Block.Data.IsUse := False;
Block := nil;
end else
begin
{ 数据还没发送完成,继续发送 }
Flag := 0;
Inc(wsaBuffer.buf, Transfered);
FillChar(Overlapped, SizeOf(Overlapped), 0);
if SOCKET_ERROR = WSASend(FSocket, @wsaBuffer, 1, BytesSend,
Flag, @Overlapped, nil) then
begin
ErrCode := WSAGetLastError;
if ErrCode <> ERROR_IO_PENDING then
Error(eeSend, ErrCode);
end;
end;
end;
end;
except
Result := RESPONSE_FAIL;
end;
end;
function WorkerThreadProc(AServer: TServerSocket): DWORD; stdcall;
var
Block: PBlock;
Transfered: DWORD;
ClientSocket: TServerClientSocket;
begin
Result := 0;
while AServer.Active do
begin
Block := nil;
Transfered := 0;
ClientSocket := nil;
if not GetQueuedCompletionStatus(AServer.CompletionPort, Transfered,
DWORD(ClientSocket), POverlapped(Block), INFINITE) then
begin
if Assigned(ClientSocket) then
FreeAndNil(ClientSocket);
Continue;
end;
{ 通知结束 }
if Cardinal(Block) = SHUTDOWN_FLAG then
break;
{ 客户可能超时?? 或是断开连接,I/O失败 }
if Transfered = 0 then
begin
FreeAndNil(ClientSocket);
Continue;
end;
case ClientSocket.WorkBlock(Block, Transfered) of
RESPONSE_UNKNOWN:
{ 操作未知的话,应该返回给客户端:...不应该Close....保留 }
FreeAndNil(ClientSocket);
RESPONSE_FAIL:
FreeAndNil(ClientSocket);
end;
end;
end;
end.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -