⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 psitcpconnection.pas

📁 一个delphi的p2p控件的源代码
💻 PAS
📖 第 1 页 / 共 2 页
字号:
    if i = 0 then begin
      // ReadLn needs to call this as data may exist in the buffer, but no EOL yet disconnected
      CheckForDisconnect(True, True);
      // Can only return 0 if error or timeout
      FReadLnTimedOut := ReadFromStack(True, ATimeout) = 0;
      if ReadLnTimedout then begin
        result := '';
        exit;
      end;
    end;
  until i > 0;
  Result := ExtractXBytesFromBuffer(i + Length(LTerminator) - 1);
  SetLength(Result, i - 1);
  if (Length(ATerminator) = 0) and (Copy(Result, Length(Result), 1) = CR) then begin
    SetLength(Result, Length(Result) - 1);
  end;
end;

function TPsiTCPConnection.ReadLnWait: string;
begin
  Result := '';
  while length(Result) = 0 do begin
    Result := Trim(ReadLn);
  end;
end;

procedure TPsiTCPConnection.ReadStream(AStream: TStream; AByteCount: Integer = -1;
 const AReadUntilDisconnect: boolean = false);
var
  i, ReadSize, WorkCount: integer;
  StreamPos: LongInt;
  LBuffer: TPsiBuffer;
begin
	if (AByteCount = -1) and (AReadUntilDisconnect = False) then begin
    // Read size from connection
  	AByteCount := ReadInteger;
  end;
  // Presize stream if we know the size - this reduces memory/disk allocations to one time
  if AByteCount > -1 then begin
    // Save current stream position
    StreamPos := AStream.Position;
    AStream.Size := AStream.Position + AByteCount;
    // Must reset to original size as in some cases size changes position
    AStream.Position := StreamPos;
  end;

  if AReadUntilDisconnect then begin
    LBuffer := TPsiBuffer.Create; try
      LBuffer.Size := RecvBufferSize;
      BeginWork(wmRead); try
        // If data already exists in the buffer, write it out first.
        AStream.WriteBuffer(FBuffer.Memory^, CurrentReadBufferSize);
        FBuffer.Clear;
        while Connected do begin
          i := ReadFromStack(false, PsiTimeoutInfinite, false, LBuffer);
          AStream.WriteBuffer(LBuffer.Memory^, i);
        end;
      finally EndWork(wmRead); end;
    finally LBuffer.Free; end;
  end else begin
    //TODO: this is the old way - can probably be merged with the new way (above)
    WorkCount := AByteCount;
    BeginWork(wmRead, WorkCount); try
      while Connected and (WorkCount > 0) do begin
        ReadSize := Min(WorkCount, RecvBufferSize);
        // Read from stack until we have enough data
        while CurrentReadBufferSize < ReadSize do begin
          ReadFromStack;
        end;
        AStream.WriteBuffer(FBuffer.Memory^, ReadSize);
        RemoveXBytesFromBuffer(ReadSize);
        Dec(WorkCount, ReadSize);
      end;
    finally EndWork(wmRead); end;
  end;
end;

procedure TPsiTCPConnection.RemoveXBytesFromBuffer(const AByteCount: Integer);
begin
  FBuffer.RemoveXBytes(AByteCount);
end;

procedure TPsiTCPConnection.ResetConnection;
begin
  FBuffer.Clear;
  FClosedGracefully := False;
end;

function TPsiTCPConnection.SendCmd(const AOut: string; const AResponse: Array of SmallInt): SmallInt;
var
  i: integer;
  LResponseIsOk: boolean;
begin
  if AOut <> #0 then begin
    WriteLn(AOut);
  end;

  Result := GetResponse;
  LResponseIsOk := false;
  for i := Low(AResponse) to High(AResponse) do begin
    if ResultNo = AResponse[i] then begin
      LResponseIsOk := true;
      break;
    end;
  end;
  if (LResponseIsOk = false) and (High(AResponse) > -1) then begin
    raise EPsiResponseError.Create(CmdResult);
  end;
end;

procedure TPsiTCPConnection.Notification(AComponent: TComponent; Operation: TOperation);
begin
  inherited;
  if (Operation = opRemove) then
  begin
    if (AComponent = FIntercept) then
      FIntercept := nil;
  end;
end;

procedure TPsiTCPConnection.SetIntercept(AValue: TPsiConnectionIntercept);
begin
  FIntercept := AValue;
  FInterceptEnabled := FIntercept <> nil;
  // add self to the Intercept's free notification list
  if assigned(FIntercept) then
  begin
    FIntercept.FreeNotification(self);
  end;
end;

procedure TPsiTCPConnection.SetInterceptEnabled(AValue: Boolean);
begin
  if (Intercept = nil) and ( not (csLoading in ComponentState))
    and AValue then begin
      raise EPsiException.Create(RSInterceptPropIsNil);
  end;
  FInterceptEnabled := AValue;
end;

procedure TPsiTCPConnection.SetRecvBufferSize(const Value: Integer);
begin
  FRecvBuffer.Size := Value;
end;

procedure TPsiTCPConnection.Write(AOut: string);
begin
  if Length(AOut) > 0 then begin
  	WriteBuffer(AOut[1], length(AOut));
  end;
end;

procedure TPsiTCPConnection.WriteBuffer(const ABuffer; AByteCount: Integer;
 const AWriteNow: boolean = false);
var
  nPos, nByteCount: Integer;

  procedure DefaultSend;
  begin
    nByteCount := Binding.Send(PChar(@ABuffer)[nPos - 1], AByteCount - nPos + 1, 0);
    // Write always does someting - never retuns 0
    // TODO - Have a AntiFreeze param which allows the send to be split up so that process
    // can be called more. Maybe a prop of the connection, MexSendSize?
    TPsiAntiFreezeBase.DoProcess(False);
  end;

begin
	if (AByteCount > 0) and (@ABuffer <> nil) then begin
    // Check if we disconnected
    CheckForDisconnect(True, True);

    if (FWriteBuffer = nil) or AWriteNow then begin
      nPos := 1;
      repeat
        if InterceptEnabled then begin
          if Intercept.SendHandling then begin
            nByteCount := Intercept.Send(PChar(@ABuffer)[nPos - 1], AByteCount - nPos + 1);
          end else begin
            DefaultSend;
          end;
        end else begin
          DefaultSend;
        end;
        FClosedGracefully := nByteCount = 0;
        //NOTE - this is currently kind of a hack - there is a newer/better plan that I have to find time
        //to implement
        //DoProcess;

        // Check if other side disconnected
        CheckForDisconnect;
        if GStack.CheckForSocketError(nByteCount, [Psi_WSAESHUTDOWN]) then begin
          DisconnectSocket;
          GStack.RaiseSocketError(Psi_WSAESHUTDOWN);
        end;
        DoWork(wmWrite, nByteCount);
        if InterceptEnabled then begin
          Intercept.DataSent(PChar(@ABuffer)[nPos - 1], AByteCount - nPos + 1);
        end;
        nPos := nPos + nByteCount
      until nPos > AByteCount;
    // Write Buffer is enabled
    end else begin
      FWriteBuffer.WriteBuffer(ABuffer, AByteCount);
      if (FWriteBuffer.Size >= FWriteBufferThreshhold) and (FWriteBufferThreshhold > 0) then begin
        // TODO: Instead of flushing - Write until buffer is smaller than Threshold. That is do at
        // least one physical send.
        FlushWriteBuffer(FWriteBufferThreshhold);
      end;
    end;
  end;
end;

function TPsiTCPConnection.WriteFile(AFile: String; const AEnableTransferFile: boolean = false)
 : cardinal;
var
  LFileStream: TFileStream;
begin
  if assigned(GServeFileProc) and (InterceptEnabled = false) and AEnableTransferFile then begin
    result := GServeFileProc(Binding.Handle, AFile);
  end else begin
    LFileStream := TFileStream.Create(AFile, fmOpenRead	or fmShareDenyNone); try
      WriteStream(LFileStream);
      result := LFileStream.Size;
    finally LFileStream.free; end;
  end;
end;

procedure TPsiTCPConnection.WriteHeader(axHeader: TStrings);
var
  i: Integer;
begin
  for i := 0 to axHeader.Count -1 do begin
    // No ReplaceAll flag - we only want to replace the first one
    WriteLn(StringReplace(axHeader[i], '=', ': ', []));
  end;
  WriteLn('');
end;

procedure TPsiTCPConnection.WriteInteger(AValue: Integer; const AConvert: boolean = true);
begin
  if AConvert then begin
    AValue := Integer(GStack.WSHToNl(LongWord(AValue)));
  end;
	WriteBuffer(AValue, SizeOf(AValue));
end;

procedure TPsiTCPConnection.WriteLn(const AOut: string = '');
begin
  Write(AOut + EOL);
end;

procedure TPsiTCPConnection.WriteStream(AStream: TStream; const AAll: boolean = true;
 const AWriteByteCount: Boolean = false);
var
  sChunk: string;
  nSize: Integer;
begin
  if AAll then begin
    AStream.Position := 0;
  end;
  nSize := AStream.Size - AStream.Position;
  if AWriteByteCount then begin
  	WriteInteger(nSize);
  end;
  BeginWork(wmWrite, nSize); try
    while true do begin
      nSize := Min(AStream.Size - AStream.Position, 32768);
      if nSize = 0 then begin
        break;
      end;
      SetLength(sChunk, nSize);
      AStream.ReadBuffer(sChunk[1], nSize);
      Write(sChunk);
    end;
  finally EndWork(wmWrite); end;
end;

procedure TPsiTCPConnection.WriteStrings(AValue: TStrings);
var
  i: Integer;
begin
  for i := 0 to AValue.Count - 1 do begin
    WriteLn(AValue.Strings[i]);
  end;
end;

function TPsiTCPConnection.SendCmd(const AOut: string; const AResponse: SmallInt): SmallInt;
begin
  if AResponse = -1 then begin
    result := SendCmd(AOut, []);
  end else begin
    result := SendCmd(AOut, [AResponse]);
  end;
end;

procedure TPsiTCPConnection.DisconnectSocket;
begin
  if Binding.HandleAllocated then begin
    DoStatus(hsDisconnecting,[Binding.PeerIP]);
    Binding.CloseSocket;
    FClosedGracefully := True;
    DoStatus(hsDisconnected,[Binding.PeerIP]);
    DoOnDisconnected;
  end;
  if InterceptEnabled then begin
    Intercept.Disconnect;
  end;
end;

procedure TPsiTCPConnection.OpenWriteBuffer(const AThreshhold: Integer = -1);
begin
  FWriteBuffer := TPsiBuffer.Create;
  FWriteBufferThreshhold := AThreshhold;
end;

procedure TPsiTCPConnection.CloseWriteBuffer;
begin
  FlushWriteBuffer;
  FreeAndNil(FWriteBuffer);
end;

procedure TPsiTCPConnection.FlushWriteBuffer(const AByteCount: Integer = -1);
begin
  if FWriteBuffer.Size > 0 then begin
    if (AByteCount = -1) or (FWriteBuffer.Size < AByteCount) then begin
      WriteBuffer(PChar(FWriteBuffer.Memory)[0], FWriteBuffer.Size, True);
      ClearWriteBuffer;
    end else begin
      WriteBuffer(PChar(FWriteBuffer.Memory)[0], AByteCount, True);
      FWriteBuffer.RemoveXBytes(AByteCount);
    end;
  end;
end;

procedure TPsiTCPConnection.ClearWriteBuffer;
begin
  FWriteBuffer.Clear;
end;

function TPsiTCPConnection.InputLn(const AMask: string =''): string;
var
  s: string;
begin
  while true do begin
    s := ReadString(1);
    if s = BACKSPACE then begin
      if length(result) > 0 then begin
        SetLength(result, Length(result) - 1);
        Write(BACKSPACE);
      end;
    end else if s = CR then begin
      ReadString(1); // LF
      WriteLn;
      exit;
    end else begin
      result := result + s;
      if Length(AMask) = 0 then begin
        Write(s);
      end else begin
        Write(AMask);
      end;
    end;
  end;
end;

function TPsiTCPConnection.ReadString(const ABytes: integer): string;
begin
  SetLength(result, ABytes);
  if ABytes > 0 then begin
  	ReadBuffer(Result[1], Length(Result));
  end;
end;

procedure TPsiTCPConnection.CancelWriteBuffer;
begin
  ClearWriteBuffer;
  CloseWriteBuffer;
end;

function TPsiTCPConnection.ReadSmallInt: SmallInt;
begin
	ReadBuffer(Result, SizeOf(Result));
end;

procedure TPsiTCPConnection.WriteSmallInt(AValue: SmallInt);
begin
	WriteBuffer(AValue, SizeOf(AValue));
end;

procedure TPsiTCPConnection.CheckForGracefulDisconnect(
  const ARaiseExceptionIfDisconnected: boolean);
begin
  ReadFromStack(ARaiseExceptionIfDisconnected, 1);
end;

{ TPsiBuffer }

procedure TPsiBuffer.RemoveXBytes(const AByteCount: integer);
begin
  if AByteCount > Size then begin
    raise EPsiException.Create(RSNotEnoughDataInBuffer);
  end;
  if AByteCount = Size then begin
    Clear;
  end else begin
    Move(PChar(Memory)[AByteCount], PChar(Memory)[0], Size - AByteCount);
    SetSize(Size - AByteCount);
  end;
end;

end.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -