iocpsvr.pas

来自「基于socket的文件传输」· PAS 代码 · 共 903 行 · 第 1/2 页

PAS
903
字号
  end;
end;

procedure TServerSocket.SetActive(AValue: Boolean);
begin
  if FActive = AValue then Exit;
  FActive := AValue;
  if FActive then
    Open
  else
    Close; 
end;

procedure TServerSocket.WndProc(var AMsg: TMessage);
begin
  try
    Dispatch(AMsg);
  except
    if Assigned(ApplicationHandleException) then
      ApplicationHandleException(Self);
  end;
end;

function TServerSocket.CheckError(AErrorCode: Integer; AInfo: string): Boolean;
var
  HandleError: Boolean;
begin
  Result := True;
  if AErrorCode = 0 then Exit;
  if AErrorCode = -1 then
    AErrorCode := WSAGetLastError;
  if AErrorCode = -1 then
    AErrorCode := GetLastError;
  if (AErrorCode <> WSAEWOULDBLOCK) and (AErrorCode <> ERROR_IO_PENDING) and
    (AErrorCode <> 0) then
  begin
    if Assigned(FOnError) then
    begin
      HandleError := False;
      FOnError(AErrorCode, SysErrorMessage(AErrorCode), AInfo, HandleError);
      if HandleError then Exit;
    end;
    raise ESocketError.CreateFmt(SWindowsSocketError,
       [SysErrorMessage(AErrorCode), AErrorCode, AInfo]);
  end;
end;

procedure TServerSocket.DoLog(ASocketEvent: TSocketEvent; AInfo: string);
begin
  if Assigned(FOnLog) then FOnLog(ASocketEvent, AInfo);
end;

procedure TServerSocket.DoRead(ASymmetricalSocket: TSymmetricalSocket;
  AData: Pointer; ACount: Integer);
begin
  if Assigned(FOnRead) then
    FOnRead(ASymmetricalSocket, AData, ACount);
end;

procedure TServerSocket.SetPort(AValue: Integer);
begin
  if FActive then
    raise ESocketError.Create('IOCP is acitve, cann''t change port');
  FPort := AValue;
end;

procedure TServerSocket.RegisterClient(ASocket: TSymmetricalSocket);
begin
  FLock.Enter;
  try
    if FClients.IndexOf(ASocket) = -1 then
    begin
      FClients.Add(ASocket);
      DoAfterConnect(ASocket);
      {* 注册关闭通知消息 *}
      WSAAsyncSelect(ASocket.Socket, FHandle, WM_CLIENTSOCKET, FD_CLOSE);
    end;
  finally
    FLock.Leave;
  end;
end;

procedure TServerSocket.UnRegisterClient(ASocket: TSymmetricalSocket);
var
  iIndex: Integer;
begin
  FLock.Enter;
  try
    iIndex := FClients.IndexOf(ASocket);
    if iIndex <> -1 then
    begin
      FClients.Delete(iIndex);
      DoDisConnect(ASocket);
    end;
  finally
    FLock.Leave;
  end;
end;

procedure TServerSocket.AcceptClient;
var
  Addr: TSockAddrIn;
  iAddrLen: Integer;
  ClientWinSocket: TSocket;
  SymmSocket: TSymmetricalSocket;
begin
  iAddrLen := SizeOf(TSockAddrIn);
  ClientWinSocket := WinSock2.WSAAccept(Socket, nil, nil, nil, 0);
  if ClientWinSocket <> INVALID_SOCKET then
  begin
    if (not Active) or (not DoConnect(ClientWinSocket)) then
    begin
      closesocket(ClientWinSocket);
      Exit;
    end;
    try
      DoLog(seAccept);
      SymmSocket := TSymmetricalSocket.Create(Self, ClientWinSocket);
      DoLog(seConnect);
    except
      closesocket(ClientWinSocket);
      CheckError;
      Exit;
    end;
    if CreateIoCompletionPort(ClientWinSocket, FIOCPHandle, DWORD(SymmSocket), 0) = 0 then
    begin
      CheckError(GetLastError, 'CreateIoCompletionPort');
      SymmSocket.Free;
    end
    else
      SymmSocket.PrepareRecv;
  end;
end;

procedure TServerSocket.DoAfterConnect(ASymSocket: TSymmetricalSocket);
begin
  if Assigned(FOnAfterConnect) then FOnAfterConnect(ASymSocket);
end;

function TServerSocket.DoConnect(ASocket: TSocket): Boolean;
var
  SockAddrIn: TSockAddrIn;
  Size: Integer;
begin
  Result := True;
  if Assigned(FOnBeforeConnect) then
  begin
    Size := SizeOf(TSockAddrIn);
    CheckError(getpeername(ASocket, SockAddrIn, Size), 'getpeername');
    FOnBeforeConnect(inet_ntoa(SockAddrIn.sin_addr), Result);
  end;
end;

procedure TServerSocket.DoDisConnect(ASymSocket: TSymmetricalSocket);
begin
  if Assigned(FOnAfterDisconnect) then FOnAfterDisconnect(ASymSocket);
end;

function TServerSocket.FindSymmClient(
  ASocket: TSocket): TSymmetricalSocket;
var
  i: Integer;
begin
  Result := nil;
  FLock.Enter;
  try
    for i := 0 to FClients.Count - 1 do
    begin
      Result := FClients[i];
      if ASocket = Result.Socket then
        Break
      else
        Result := nil;
    end;
  finally
    FLock.Leave;
  end;
end;

function TServerSocket.GetClient(const AIndex: Integer): TSymmetricalSocket;
begin
  Result := FClients[AIndex];
end;

function TServerSocket.GetClientCount: Integer;
begin
  Result := FClients.Count;
end;

procedure TServerSocket.WMClientClose(var AMsg: TCMSocketMessage);
var
  ASymmSocket: TSymmetricalSocket;
begin
  if AMsg.SelectEvent = FD_CLOSE then
  begin
    ASymmSocket := FindSymmClient(AMsg.Socket);
    if Assigned(ASymmSocket) then
      ASymmSocket.Free;
  end;
end;

{ TSocketThread }

constructor TSocketThread.Create(AServer: TServerSocket);
begin
  FServer := AServer;
  inherited Create(False);
  FreeOnTerminate := True;
end;

{ TAcceptThread }

procedure TAcceptThread.Execute;
begin
  inherited;
  while not Terminated and FServer.Active do
  begin
    FServer.AcceptClient;
  end;
end;

{ TWorkThread }

procedure TWorkThread.Execute;
var
  ASymSocket: TSymmetricalSocket;
  AIOCPStruct: PIOCPStruct;
  iWorkCount: Cardinal;
begin
  inherited;
  while (not Terminated) and (FServer.Active) do
  begin
    AIOCPStruct := nil;
    iWorkCount := 0;
    ASymSocket := nil;
    if not GetQueuedCompletionStatus(FServer.FIOCPHandle, iWorkCount,
      DWORD(ASymSocket), POVerlapped(AIOCPStruct), INFINITE) then
    begin
      if Assigned(ASymSocket) then
        FreeAndNil(ASymSocket);
      Continue;
    end;

    if Cardinal(AIOCPStruct) = SHUTDOWN_FLAG then Break; //退出标志
    if not FServer.Active then Break; //退出

    {* 客户可能超时 或是断开连接,I/O失败 应放在通知结束的后面 *}
    if iWorkCount = 0 then
    begin
      //FreeAndNil(ASymSocket);  //不在这儿释放,而是接收释放消息来释放
      Continue;
    end;
    FServer.DoLog(AIOCPStruct.Event);
    if ASymSocket.WorkBlock(AIOCPStruct, iWorkCount) = -1 then
    begin
      FreeAndNil(ASymSocket);
    end;
  end;
end;

{ TSymmetricalSocket }

constructor TSymmetricalSocket.Create(ASvrSocket: TServerSocket;
  ASocket: TSocket);
begin
  FServer := ASvrSocket;
  FSocket := ASocket;
  FAssignMemory := TList.Create;
  FServer.RegisterClient(Self);
  //PrepareRecv;
end;

destructor TSymmetricalSocket.Destroy;
var
  i: Integer;
  Linger: TLinger;
begin
  FServer.UnRegisterClient(Self);
  FillChar(Linger, SizeOf(TLinger), 0);     //优雅关闭
  setsockopt(FSocket, SOL_SOCKET, SO_LINGER, @Linger, Sizeof(Linger));
  closesocket(FSocket);
  for i := FAssignMemory.Count - 1 downto 0 do
    FServer.MemoryManager.Release(FAssignMemory[i]);
  FAssignMemory.Free;
  inherited;
end;

function TSymmetricalSocket.Allocate: PIOCPStruct;
var
  i: Integer;
begin
  for i := 0 to FAssignMemory.Count - 1 do
  begin
    Result := FAssignMemory[i];
    if not Result.Active then
    begin
      Result.Active := True;
      Exit;
    end;
  end;
  Result := FServer.MemoryManager.Allocate;
  FAssignMemory.Add(Result);
  Result.Active := True;
end;

function TSymmetricalSocket.PrepareRecv(AIOCPStruct: PIOCPStruct = nil): Boolean;
var
  iFlags, iTransfer: Cardinal;
  ErrCode: Integer;
begin
  if not Assigned(AIOCPStruct) then
    AIOCPStruct := Allocate;
  iFlags := 0;
  AIOCPStruct.Event := seRead;
  FillChar(AIOCPStruct.Buffer, SizeOf(AIOCPStruct.Buffer), 0);
  FillChar(AIOCPStruct.Overlapped, SizeOf(AIOCPStruct.Overlapped), 0);
  AIOCPStruct.wsaBuffer.buf := @AIOCPStruct.Buffer;
  AIOCPStruct.wsaBuffer.len := MAX_BUFSIZE;
  Result := WSARecv(FSocket, @AIOCPStruct.wsaBuffer, 1, @iTransfer, @iFlags, @AIOCPStruct.Overlapped, nil) <> SOCKET_ERROR;
  if not Result then
  begin
    ErrCode := WSAGetLastError;
    Result := ErrCode = ERROR_IO_PENDING;
    if not Result then
      FServer.CheckError(ErrCode, 'WSARecv');
  end;
end;

function TSymmetricalSocket.WorkBlock(AIOCPStruct: PIOCPStruct;
  ACount: DWORD): Integer;
var
  ErrCode: Integer;
  iSend, iFlag: Cardinal;
begin
  Result := 0;
  try
    case AIOCPStruct.Event of
      seRead:  //接收数据
      begin
        FServer.DoRead(Self, @AIOCPStruct.Buffer[0], ACount);
        if PrepareRecv(AIOCPStruct) then
          Result := ACount;
      end;
      seWrite: //发送数据
      begin
        Dec(AIOCPStruct.wsaBuffer.len, ACount);
        if AIOCPStruct.wsaBuffer.len <= 0 then
        begin
          AIOCPStruct.Active := False;
        end
        else
        begin
          FillChar(AIOCPStruct.Overlapped, SizeOf(AIOCPStruct.Overlapped), 0);
          iFlag := 0;
          if SOCKET_ERROR = WSASend(FSocket, @AIOCPStruct.wsaBuffer, 1, @iSend,
            iFlag, @AIOCPStruct.Overlapped, nil) then
          begin
            ErrCode := WSAGetLastError;
            if ErrCode <> ERROR_IO_PENDING then
              FServer.CheckError(ErrCode, 'WSASend');
          end
          else Result := iSend;
        end;
      end;
    end;
  except
    Result := 0;
  end;
end;

function TSymmetricalSocket.Write(var ABuf; ACount: Integer): Integer;
var
  AIOCPStruct: PIOCPStruct;
  ErrCode: Integer;
  iFlag, iSend: Cardinal;
begin
  Result := ACount;
  if Result = 0 then Exit;
  AIOCPStruct := Allocate;
  iFlag := 0;
  AIOCPStruct.Event := seWrite;
  FillChar(AIOCPStruct.Buffer[0], SizeOf(AIOCPStruct.Buffer), 0);
  CopyMemory(@AIOCPStruct.Buffer[0], @ABuf, ACount);
  AIOCPStruct.wsaBuffer.buf := @AIOCPStruct.Buffer[0];
  AIOCPStruct.wsaBuffer.len := Result;

  if SOCKET_ERROR = WSASend(FSocket, @AIOCPStruct.wsaBuffer, 1, @iSend, iFlag,
    @AIOCPStruct.Overlapped, nil) then
  begin
    ErrCode := WSAGetLastError;
    if ErrCode <> ERROR_IO_PENDING then
    begin
      Result := SOCKET_ERROR;
      FServer.CheckError(ErrCode, 'WSASend');
    end;
  end;
end;

function TSymmetricalSocket.WriteString(const AValue: string): Integer;
begin
  Result := Write(Pointer(AValue)^, Length(AValue));
end;

function TSymmetricalSocket.GetRemoteIP: string;
var
  SockAddrIn: TSockAddrIn;
  iSize: Integer;
  HostEnt: PHostEnt;
begin
  if FRemoteAddress = '' then
  begin
    iSize := SizeOf(SockAddrIn);
    FServer.CheckError(getpeername(FSocket, SockAddrIn, iSize), 'getpeername');
    FRemoteAddress := inet_ntoa(SockAddrIn.sin_addr);
  end;
  Result := FRemoteAddress;
end;

function TSymmetricalSocket.GetRemotePort: Integer;
var
  SockAddrIn: TSockAddrIn;
  iSize: Integer;
  HostEnt: PHostEnt;
begin
  if FRemoteAddress = '' then
  begin
    iSize := SizeOf(SockAddrIn);
    FServer.CheckError(getpeername(FSocket, SockAddrIn, iSize), 'getpeername');
    FRemotePort := ntohs(SockAddrIn.sin_port);
  end;
  Result := FRemotePort;
end;

function TSymmetricalSocket.GetRemoteHost: string;
var
  SockAddrIn: TSockAddrIn;
  iSize: Integer;
  HostEnt: PHostEnt;
begin
  if FRemoteAddress = '' then
  begin
    iSize := SizeOf(SockAddrIn);
    FServer.CheckError(getpeername(FSocket, SockAddrIn, iSize), 'getpeername');
    HostEnt := gethostbyaddr(@SockAddrIn.sin_addr.S_addr, 4, PF_INET);
    if HostEnt <> nil then FRemoteHost := HostEnt.h_name;
  end;
  Result := FRemoteHost
end;

end.

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?