📄 sockutils.pas
字号:
Exit;
end;
end;
Result := False;
end;
function GetPeerSite(Socket: Cardinal; var HostName, Address: string): Boolean; overload;
var
addrIn: TSockAddrIn;
Size: Integer;
begin
Size := SizeOf(addrIn);
if getpeername(Socket, addrIn, Size) = SOCKET_ERROR then
LogWinSockError('getpeername')
else if GetPeerSite(AddrIn, HostName, Address) then
begin
Result := True;
Exit;
end;
Result := False;
end;
procedure EnumSites(Sites: TSiteInfos);
var
E: THandle;
Cnt, Size: Cardinal;
Buf: Pointer;
Res: PNetResource;
I, OK: Integer;
Name: string;
begin
OK := WNetOpenEnum(RESOURCE_CONTEXT, 0, 0, nil, E);
if OK = NO_ERROR then
begin
Size := 16384;
_NewAlloc(Size, Buf);
Cnt := $ffffffff;
repeat
OK := WNetEnumResource(E, Cnt, Buf, Size);
Res := Buf;
for I := 0 to Pred(Cnt) do
begin
if Res.dwUsage = RESOURCEUSAGE_CONTAINER then
if Res.dwType = RESOURCETYPE_ANY then
if Res.lpRemoteName <> nil then
begin
SetString(Name, Res.lpRemoteName, StrLen(Res.lpRemoteName));
if Copy(Name, 1, 2) = '\\' then
Delete(Name, 1, 2);
Sites.InsertSite(Name);
end;
Res := Pointer(Integer(Res) + SizeOf(TNetResource));
end;
until (OK <> ERROR_MORE_DATA);
_Free(Buf);
WNetCloseEnum(E);
end;
end;
procedure _ClientChange(Sender: PSiteInfo; Event: TSiteEvent);
begin
if Event = seConnected then
Jet.Sites.ConnectAsClient(Sender)
else
Jet.Sites.DisconnectAsClient(Sender);
Service.ClientChange(Sender, Event);
end;
procedure _ServerChange(Sender: PSiteInfo; Event: TSiteEvent);
begin
if Event = seConnected then
Jet.Sites.ConnectAsServer(Sender)
else
Jet.Sites.DisconnectAsServer(Sender);
Service.ServerChange(Sender, Event);
end;
// 完成端口工作者线程函数
function ServerWorkerThreadProc(Param: Pointer): Cardinal;
var
Svr: PServerInfo;
CompletionPort: THandle;
Key, Bytes: Cardinal;
Overlapped: POverlapped;
Site: PSiteInfo;
PIOData: PPerHandleIOData;
Action: TAsyncAction;
begin
LogMessage('Worker Thread Start...');
Svr := Param;
InterlockedIncrement(Svr^.WorkerThreads);
CompletionPort := Svr.CompletionPort;
while True do
if GetQueuedCompletionStatus(CompletionPort, Bytes, Key, Overlapped, INFINITE) then
begin
if Key = 0 then
// 线程正常退出
Break
else
if Overlapped <> nil then
begin
PIOData := PPerHandleIOData(Overlapped);
Action := Actions.Find(PIOData^.Key);
if Bytes = 0 then
begin
// 客户端断开
Site := Jet.Sites.FindSite(Key);
if Site <> nil then
begin
Jet.Sites.DisconnectAsClient(Site);
_ClientChange(Site, seDisconnected);
if Action <> nil then
Action.Abandon;
end
else
begin
// 无效 Key 值
LogError('Invalid completion key value');
Break;
end;
end
else
if Action <> nil then
// 传输完成
Action.Complete(Bytes)
else
begin
// 空的异步活动
LogError('An asynchronous action not found!');
Break;
end;
end
else
// 空的重叠结构
LogError('No overlapped structure entry!');
end
else
begin
// 查询完成状态失败
LogWindowsError('GetQueuedCompletionStatus');
Break;
end;
LogMessage('Worker Thread Terminated');
if InterlockedDecrement(Svr^.WorkerThreads) = 0 then
SetEvent(Svr^.Waiter);
Result := 0;
end;
// 监听线程函数
function AcceptThreadProc(Param: Pointer): Cardinal;
var
Svr: PServerInfo;
ListenSocket, AcceptSocket: TSocket;
SockAddr: TSockAddr;
Size: Integer;
ErrCode: Integer;
Host, Addr: string;
SiteInfo: PSiteInfo;
I: Integer;
begin
LogMessage('Listen Thread Start...');
Svr := Param;
InterlockedIncrement(Svr^.WorkerThreads);
ListenSocket := Svr.ListenSocket;
while True do
begin
Size := SizeOf(SockAddr);
// AcceptSocket := WSAAccept(ListenSocket, SockAddr, @Size, nil, 0);
AcceptSocket := accept(ListenSocket, SockAddr, Size);
if AcceptSocket = INVALID_SOCKET then
begin
ErrCode := WSAGetLastError;
if ErrCode <> WSAEINTR then
LogWinSockError('WSAAccept', ErrCode); // WSAAccept
Break;
end
else
if not GetPeerSite(SockAddr, Host, Addr) then
Break
else
begin
SiteInfo := Jet.Sites.AddSite(Host, Addr);
if CreateIOCompletionPort(AcceptSocket, Svr^.CompletionPort, SiteInfo^.Key, 0) = 0 then
begin
LogWindowsError('CreateIOCompletionPort');
I := Jet.Sites.Count;
while I > 0 do
begin
Dec(I);
closesocket(Jet.Sites[I]^.AsClient);
Jet.Sites[I]^.AsClient := INVALID_SOCKET;
end;
Break;
end
else
begin
SiteInfo^.AsClient := AcceptSocket;
_ClientChange(SiteInfo, seConnected);
Actions.GetAction(TAcceptAction, SiteInfo).Queueing;
end;
end;
end;
LogMessage('Listen Thread Terminated');
if InterlockedDecrement(Svr^.WorkerThreads) = 0 then
SetEvent(Svr^.Waiter);
Result := 0;
end;
// 客户端重叠端口完成例程
procedure ClientCompletionRoutine(Error: DWORD; Transferred: DWORD;
Overlapped: POverlapped; Flags: DWORD); stdcall;
var
PIOData: PPerHandleIOData;
Action: TAsyncAction;
Site: PSiteInfo;
begin
// LogMessage('Client [%d] Completion Routine Start...', [Integer(Overlapped)]);
if Overlapped <> nil then
begin
PIOData := PPerHandleIOData(Overlapped);
Action := Actions.Find(PIOData^.Key);
if Action <> nil then
if Transferred = 0 then
begin
Site := Action.FSite;
Action.Abandon;
// LogMessage('GRACEFUL DISCONNECTED FROM SERVER %s[%s]', [Site^.HostName, Site^.Address]);
_ServerChange(Site, seDisconnected);
end
else
Action.Complete(Transferred)
else
// 空的异步活动
LogError('No asynchronous action!');
end
else
// 空的重叠结构
LogError('No overlapped structure entry!');
// LogMessage('Client [%d] Completion Routine Terminated...', [Integer(Overlapped)]);
end;
// 创建工作者线程
function CreateWorkerThreads(Server: PServerInfo): Boolean;
var
SysInfo: _System_Info;
I, X: Integer;
Handle, Id: THandle;
begin
Server^.WorkerThreads := 0;
GetSystemInfo(SysInfo);
I := SysInfo.dwNumberOfProcessors * 2;
while I > 0 do
begin
Dec(I);
Handle := BeginThread(nil, 0, @ServerWorkerThreadProc, Server, 0, Id);
if Handle = 0 then
begin
LogWindowsError('CreateThread');
X := Server^.WorkerThreads;
while X > 0 do
begin
Dec(X);
PostQueuedCompletionStatus(Server^.CompletionPort, 0, 0, nil);
end;
WaitForSingleObject(Server^.Waiter, INFINITE);
Result := False;
Exit;
end
else
CloseHandle(Handle);
end;
Result := True;
end;
function StartSockServer(AService: TCustomService; var Server: PServerInfo): Boolean;
var
addr: TSockAddr;
begin
Service := AService;
_NewAlloc(SizeOf(TServerInfo), Pointer(Server));
Server^.WorkerThreads := 0;
Server^.Waiter := CreateEvent(nil, False, False, nil);
Server^.ListenSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);
if Server^.ListenSocket = INVALID_SOCKET then
LogWinSockError('WSASocket')
else
begin
FillChar(addr, SizeOf(addr), 0);
addr.sin_family := AF_INET;
addr.sin_port := htons(Service.Port);
addr.sin_addr.S_addr := htonl(INADDR_ANY);
if bind(Server^.ListenSocket, @addr, Sizeof(addr)) > 0 then
LogWinSockError('bind')
else
begin
Server^.CompletionPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
if Server^.CompletionPort <= 0 then
LogWindowsError('CreateIoCompletionPort')
else
begin
if CreateWorkerThreads(Server) then
if listen(Server^.ListenSocket, 5) = SOCKET_ERROR then
LogWinSockError('listen')
else
if StartThread(@AcceptThreadProc, Server) then
begin
Result := True;
Exit;
end;
CloseHandle(Server^.CompletionPort);
end;
end;
closesocket(Server^.ListenSocket);
end;
if InterlockedExchangeAdd(Server^.WorkerThreads, 0) > 0 then
WaitForSingleObject(Server^.Waiter, INFINITE);
CloseHandle(Server^.Waiter);
_Free(Pointer(Server));
Result := False;
end;
procedure StopSockServer(var ServerInfo: PServerInfo);
var
X: Integer;
begin
if ServerInfo <> nil then
begin
X := InterlockedExchangeAdd(ServerInfo^.WorkerThreads, 0);
while X > 0 do
begin
Dec(X);
PostQueuedCompletionStatus(ServerInfo^.CompletionPort, 0, 0, nil);
end;
closesocket(ServerInfo^.ListenSocket);
WaitForSingleObject(ServerInfo^.Waiter, INFINITE);
CloseHandle(ServerInfo^.CompletionPort);
CloseHandle(ServerInfo^.Waiter);
_Free(Pointer(ServerInfo));
end;
end;
function ConnectServer(Port: Integer; Site: PSiteInfo): Boolean;
var
addr: TSockAddr;
ClientSocket: TSocket;
HostEnt: PHostEnt;
begin
Result := False;
addr.sin_family := AF_INET;
addr.sin_port := htons(Port);
addr.sin_addr.S_addr := inet_addr(Site^.Address);
if addr.sin_addr.S_addr = INADDR_NONE then
begin
HostEnt := gethostbyname(Site^.HostName);
if HostEnt = nil then
begin
LogWinSockError(Site, 'gethostbyname');
Exit;
end
else
with addr.sin_addr, HostEnt^ do
begin
S_un_b.s_b1 := Ord(h_addr^[0]);
S_un_b.s_b2 := Ord(h_addr^[1]);
S_un_b.s_b3 := Ord(h_addr^[2]);
S_un_b.s_b4 := Ord(h_addr^[3]);
end;
end;
ClientSocket := socket(AF_INET, SOCK_STREAM, 0);
if connect(ClientSocket, @addr, SizeOf(addr)) = SOCKET_ERROR then
begin
LogWinSockError(Site, 'connect');
closesocket(ClientSocket);
end
else
begin
Site^.AsServer := ClientSocket;
_ServerChange(Site, seConnected);
Result := True;
end;
end;
function DisconnectServer(Site: PSiteInfo): Boolean;
begin
LogMessage('SHUTDOWN TO %s[%s]', [Site^.HostName, Site^.Address]);
if shutdown(Site^.AsServer, SD_SEND) = SOCKET_ERROR then
Result := LogWinSockError(Site, 'shutdown')
else
Result := True;
end;
function ReceiveFromClient(Site: PSiteInfo; Data: PPerHandleIOData;
Key: Cardinal; Buffer: Pointer; Size: Integer): Boolean;
var
Bytes: Cardinal;
Flags: DWORD;
Buf: PWSABUF;
Overlapped: PWSAOverlapped;
begin
Result := True;
FillChar(Data^.Overlapped, SizeOf(TOverlapped), 0);
Data^.WSABuffer.buf := Buffer;
Data^.WSABuffer.len := Size;
Data^.Key := Key;
Flags := 0;
Buf := @Data^.WSABuffer;
Overlapped := PWSAOverlapped(Data);
if WSARecv(Site.AsClient, Buf, 1, Bytes, Flags, Overlapped, nil) = SOCKET_ERROR then
if LogWinSockError('WSARecv') then
Result := False;
end;
function SendToClient(Site: PSiteInfo; Data: PPerHandleIOData; Key: Cardinal;
Buffer: Pointer; Size: Integer): Boolean;
var
Bytes: Cardinal;
Flags: DWORD;
Buf: PWSABUF;
Overlapped: PWSAOverlapped;
begin
Result := True;
FillChar(Data^.Overlapped, SizeOf(TOverlapped), 0);
Data^.WSABuffer.buf := Buffer;
Data^.WSABuffer.len := Size;
Data^.Key := Key;
Flags := 0;
Buf := @Data^.WSABuffer;
Overlapped := PWSAOverlapped(Data);
if WSASend(Site.AsClient, Buf, 1, Bytes, Flags, Overlapped, nil) = SOCKET_ERROR then
if LogWinSockError('WSASend') then
Result := False;
end;
function ReceiveFromServer(Site: PSiteInfo; Data: PPerHandleIOData;
Key: Cardinal; Buffer: Pointer; Size: Integer): Boolean;
var
Bytes: Cardinal;
Flags: DWORD;
Buf: PWSABUF;
Overlapped: PWSAOverlapped;
begin
Result := True;
FillChar(Data^.Overlapped, SizeOf(TOverlapped), 0);
Data^.WSABuffer.buf := Buffer;
Data^.WSABuffer.len := Size;
Data^.Key := Key;
Flags := 0;
Buf := @Data^.WSABuffer;
Overlapped := PWSAOverlapped(Data);
if WSARecv(Site.AsServer, Buf, 1, Bytes, Flags, Overlapped,
@ClientCompletionRoutine) = SOCKET_ERROR then
if LogWinSockError(Site, 'WSARecv') then
Result := False;
end;
function SendToServer(Site: PSiteInfo; Data: PPerHandleIOData; Key: Cardinal;
Buffer: Pointer; Size: Integer): Boolean;
var
Bytes: Cardinal;
Flags: DWORD;
Buf: PWSABUF;
Overlapped: PWSAOverlapped;
begin
Result := True;
FillChar(Data^.Overlapped, SizeOf(TOverlapped), 0);
Data^.WSABuffer.buf := Buffer;
Data^.WSABuffer.len := Size;
Data^.Key := Key;
Flags := 0;
Buf := @Data^.WSABuffer;
Overlapped := PWSAOverlapped(Data);
if WSASend(Site.AsServer, Buf, 1, Bytes, Flags, Overlapped,
@ClientCompletionRoutine) = SOCKET_ERROR then
if LogWinSockError(Site, 'WSARecv') then
Result := False;
end;
{ TSiteInfos }
constructor TSiteInfos.Create;
begin
FCriticalSection := TCriticalSection.Create;
FLastKey := 0;
FList := TList.Create;
FServers := TList.Create;
FClients := TList.Create;
end;
destructor TSiteInfos.Destroy;
var
I: Integer;
P: Pointer;
begin
FServers.Free;
FClients.Free;
I := Count;
while I > 0 do
begin
Dec(I);
P := Items[I];
_Free(P);
end;
FList.Free;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -