ringbufferstream.pas

来自「Drag files and Drop to delphi forms 0402」· PAS 代码 · 共 396 行

PAS
396
字号
unit RingbufferStream;

interface

uses
  SyncObjs,
  Windows,
  Classes;

type
  TStreamProgressEvent = procedure(Sender: TObject; Count, MaxCount: integer) of object;

  TBucket = record
    Size: integer;
    InUse: boolean;
    Data: byte;
  end;

  PBucket = ^TBucket;

  TRingBuffer = class
  private
    FBucketCount: integer;
    FBucketSize: integer;
    FBuffer: pointer;
    FBuckets: array of PBucket;
    FAbort: THandle;
    FEmptyBucketSemaphore: THandle;
    FFullBucketSemaphore: THandle;
    FWriteHead: integer;
    FReadTail: integer;
    FAborted: boolean;
    FRefCount: integer;

    FDebugFullBucketCount: integer;
    FDebugEmptyBucketCount: integer;
  public
    constructor Create(ABucketCount: integer; ABucketSize: integer);
    destructor Destroy; override;

    procedure AddRef;
    procedure Release;

    // Write bucket functions
    function PopEmptyBucket: PBucket;
    procedure PushFullBucket(Buffer: PBucket);

    // Read bucket functions
    function PopFullBucket: PBucket;
    procedure PushEmptyBucket(Buffer: PBucket);

    procedure Abort;
    property Aborted: boolean read FAborted;
    property BucketCount: integer read FBucketCount;
    property BucketSize: integer read FBucketSize;
  end;

  TFifoStream = class(TStream)
  private
    FSize: int64;
    FBuffer: TRingBuffer;
    FReadOnly: boolean;
    FWriteOnly: boolean;
    FAborted: boolean;
    FPosition: int64;
    FOverflowBucket: TMemoryStream;
    FFakeWriteDuringAbort: boolean;
    FReset: boolean;
  protected
    function GetAborted: boolean;
    function GetSize: Int64; override;
    procedure SetSize(NewSize: Longint); override;
  public
    constructor Create(ABuffer: TRingBuffer; ASize: int64 = 0);
    constructor CreateForRead(ABuffer: TRingBuffer; ASize: int64);
    constructor CreateForWrite(ABuffer: TRingBuffer);
    destructor Destroy; override;

    function Read(var Buffer; Count: Longint): Longint; override;
    function Write(const Buffer; Count: Longint): Longint; override;
    function Seek(Offset: Longint; Origin: Word): Longint; override;

    procedure Abort;
    property Aborted: boolean read GetAborted;
    property FakeWriteDuringAbort: boolean read FFakeWriteDuringAbort write FFakeWriteDuringAbort;
  end;

implementation

uses
  SysUtils;

procedure TRingBuffer.Abort;
begin
  FAborted := True;
  SetEvent(FAbort);
end;

procedure TRingBuffer.AddRef;
begin
  InterlockedIncrement(FRefCount);
end;

constructor TRingBuffer.Create(ABucketCount, ABucketSize: integer);
var
  Size: integer;
  i: integer;
  p: PChar;
begin
  inherited Create;

  FBucketCount := ABucketCount;
  FBucketSize := ABucketSize;

  Size := FBucketCount*(SizeOf(TBucket)+FBucketSize);
  GetMem(FBuffer, Size);
  FillChar(FBuffer^, Size, 0);

  SetLength(FBuckets, FBucketCount);
  p := FBuffer;
  for i := 0 to FBucketCount-1 do
  begin
    FBuckets[i] := PBucket(p);
    inc(p, SizeOf(TBucket)+FBucketSize);
  end;

  FDebugEmptyBucketCount := FBucketCount;
  
  FFullBucketSemaphore := CreateSemaphore(nil, 0, FBucketCount, nil);
  FEmptyBucketSemaphore := CreateSemaphore(nil, FBucketCount, FBucketCount, nil);
  FAbort := CreateEvent(nil, True, False, nil);
end;

destructor TRingBuffer.Destroy;
begin
  CloseHandle(FFullBucketSemaphore);
  CloseHandle(FEmptyBucketSemaphore);
  CloseHandle(FAbort);
  FreeMem(FBuffer);

  inherited Destroy;
end;

function TRingBuffer.PopFullBucket: PBucket;
var
  Handles: array[0..1] of THandle;
  Res: DWord;
  n: integer;
begin
  Result := FBuckets[FReadTail];

  // Wait for filled bucket to become available (or abort)
  Handles[0] := FAbort;
  Handles[1] := FFullBucketSemaphore;
  Res := WaitForMultipleObjects(Length(Handles), @Handles[0], False, INFINITE);
  if (Res = WAIT_OBJECT_0) or ((Res >= WAIT_ABANDONED_0) and (Res <= WAIT_ABANDONED_0+Length(Handles)-1)) then
  begin
    Result := nil;
    exit;
  end;

  n := InterlockedDecrement(FDebugFullBucketCount);
  OutputDebugString(PChar(Format('< Full buckets: %d', [n])));

  ASSERT(Res = WAIT_OBJECT_0+1);
  ASSERT(Result.InUse, 'Full bucket marked empty');
end;

function TRingBuffer.PopEmptyBucket: PBucket;
var
  Handles: array[0..1] of THandle;
  Res: DWord;
  n: integer;
begin
  Result := FBuckets[FWriteHead];

  // Wait for empty bucket to become available (or abort)
  Handles[0] := FAbort;
  Handles[1] := FEmptyBucketSemaphore;
  Res := WaitForMultipleObjects(Length(Handles), @Handles[0], False, INFINITE);
  if (Res = WAIT_OBJECT_0) or ((Res >= WAIT_ABANDONED_0) and (Res <= WAIT_ABANDONED_0+Length(Handles)-1)) then
    begin
      Result := nil;
      exit;
    end;

  n := InterlockedDecrement(FDebugEmptyBucketCount);
  OutputDebugString(PChar(Format('< Empty buckets: %d', [n])));

  ASSERT(Res = WAIT_OBJECT_0+1);
  ASSERT(not Result.InUse, 'Empty bucket marked full');
end;

procedure TRingBuffer.PushEmptyBucket(Buffer: PBucket);
var
  n: integer;
begin
  ASSERT(FBuckets[FReadTail] = Buffer, 'Push of empty bucket not at tail');
  Buffer^.InUse := False;
  Buffer^.Size := 0;
  FReadTail := (FReadTail+1) mod FBucketCount;
  n := InterlockedIncrement(FDebugEmptyBucketCount);
  OutputDebugString(PChar(Format('> Empty buckets: %d', [n])));
  // Make empty bucket available
  ReleaseSemaphore(FEmptyBucketSemaphore, 1, nil)
end;

procedure TRingBuffer.PushFullBucket(Buffer: PBucket);
var
  n: integer;
begin
  ASSERT(FBuckets[FWriteHead] = Buffer, 'Push of full bucket not at head');
  Buffer.InUse := True;
  FWriteHead := (FWriteHead+1) mod FBucketCount;
  n := InterlockedIncrement(FDebugFullBucketCount);
  OutputDebugString(PChar(Format('> Full buckets: %d', [n])));
  // Make filled bucket available
  ReleaseSemaphore(FFullBucketSemaphore, 1, nil);
end;

procedure TRingBuffer.Release;
var
  n: integer;
begin
  n := InterlockedDecrement(FRefCount);

  if (n = 1) then
    Abort // Abort if one of the owners leaves the party
  else
  if (n = 0) then
    Free;
end;


procedure TFifoStream.Abort;
begin
  FAborted := True;
  FBuffer.Abort;
end;

constructor TFifoStream.Create(ABuffer: TRingBuffer; ASize: int64);
begin
  inherited Create;
  FSize := ASize;
  FBuffer := ABuffer;
  FBuffer.AddRef;
  FOverflowBucket := TMemoryStream.Create;
end;

constructor TFifoStream.CreateForRead(ABuffer: TRingBuffer; ASize: int64);
begin
  Create(ABuffer, ASize);
  FReadOnly := True;
end;

constructor TFifoStream.CreateForWrite(ABuffer: TRingBuffer);
begin
  Create(ABuffer);
  FWriteOnly := True;
end;

destructor TFifoStream.Destroy;
begin
  FOverflowBucket.Free;
  FBuffer.Release;
  inherited Destroy;
end;

function TFifoStream.GetAborted: boolean;
begin
  Result := FAborted or FBuffer.Aborted;
end;

function TFifoStream.GetSize: Int64;
begin
  Result := FSize;
end;

function TFifoStream.Read(var Buffer; Count: Integer): Longint;
var
  Bucket: PBucket;
begin
  if (FWriteOnly) or (FReset) then
    raise Exception.Create('Invalid stream operation');

  if (Aborted) or (FPosition >= FSize) then
  begin
    Result := 0;
    exit;
  end;

  if (FOverflowBucket.Position >= FOverflowBucket.Size) then
  begin
    // Get a filled bucket
    Bucket := FBuffer.PopFullBucket;
    if (Bucket = nil) or (Aborted) then
    begin
      Result := 0;
      exit;
    end;

    FOverflowBucket.Position := 0;
    FOverflowBucket.Size := Bucket^.Size;

    // Transfer data from the bucket to the overflow buffer.
    FOverflowBucket.Write(Bucket^.Data, Bucket^.Size);
    FOverflowBucket.Position := 0;

    // Release the bucket
    FBuffer.PushEmptyBucket(Bucket);
  end;

  // Transfer data from the overflow buffer to the drop target
  Result := FOverflowBucket.Read(Buffer, Count);
  inc(FPosition, Result);
end;

function TFifoStream.Seek(Offset: Integer; Origin: Word): Longint;
var
  NewPos: int64;
begin
  NewPos := 0;
  case Origin of
    soFromBeginning:
      NewPos := Offset;
    soFromCurrent:
      NewPos := FPosition+Offset;
    soFromEnd:
      NewPos := FSize-Offset;
  end;
  if (NewPos <> FPosition) then
  begin
    if (NewPos = 0) then
    begin
      FReset := True;
      Abort;
    end else
      raise Exception.Create('Invalid stream operation');
  end;
  Result := NewPos;
end;

procedure TFifoStream.SetSize(NewSize: Integer);
begin
  FSize := NewSize;
end;

function TFifoStream.Write(const Buffer; Count: Integer): Longint;
var
  Bucket: PBucket;
  Size: integer;
  p: PChar;
begin
  if (FReadOnly) or (FReset) then
    raise Exception.Create('Invalid stream operation');

  Result := 0;
  if (Aborted) then
  begin
    if (FakeWriteDuringAbort) then
      Result := Count;
    exit;
  end;

  p := @Buffer;
  while (Count > 0) do
  begin
    Bucket := FBuffer.PopEmptyBucket;
    if (Bucket = nil) or (Aborted) then
    begin
      if (FakeWriteDuringAbort) then
        inc(Result, Count)
      else
        Result := 0;
      exit;
    end;

    Size := Count;
    if (Size > FBuffer.BucketSize) then
      Size := FBuffer.BucketSize;

    Move(p^, Bucket^.Data, Size);
    Bucket^.Size := Size;
    FBuffer.PushFullBucket(Bucket);

    dec(Count, Size);
    inc(Result, Size);
    inc(p, Size);

    if (FWriteOnly) then
      inc(FPosition, Result);
  end;
end;

end.

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?