⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 uasyncio.pas

📁 delphi源码
💻 PAS
📖 第 1 页 / 共 2 页
字号:
//------------------------------------------------------------------------------
// File: UAsyncIO.pas
// Original files: asyncio.h, asyncio.c
//
// Desc: Base library with I/O functionality.
//
// Portions created by Microsoft are
// Copyright (c) 2000-2002  Microsoft Corporation.  All rights reserved.
//------------------------------------------------------------------------------
unit UAsyncIo;

interface
uses
  Windows, Contnrs, BaseClass, DirectShow9, DSUtil;
type
  //
  // definition of CAsyncFile object that performs file access. It provides
  // asynchronous, unbuffered, aligned reads from a file, using a worker thread
  // on win95 and potentially overlapped i/o if available.

  // !!! Need to use real overlapped i/o if available
  // currently only uses worker thread, not overlapped i/o


  TBCAsyncIo = class;
  TBCAsyncStream = class;

  LONGLONG = Int64;
  PLONGLONG = ^LONGLONG;
  //
  //  Model the stream we read from based on a file-like interface
  //
  TBCAsyncStream = class
  public
    function SetPointer(APos: LONGLONG): HResult; virtual; abstract;
    function Read(ABuffer: PByte; ABytesToRead: DWord;
      AAlign: Boolean; out ABytesRead: DWord): HResult; virtual; abstract;

    function Size(out AAvailable: LONGLONG): LONGLONG; overload; virtual; abstract;
    function Size: LONGLONG; overload; virtual;
    function Alignment: DWord; virtual; abstract;
    procedure Lock; virtual; abstract;
    procedure Unlock; virtual; abstract;
    //procedure SetStopHandle(hevStop: THandle); virtual
  end;

  // represents a single request and performs the i/o. Can be called on either
  // worker thread or app thread, but must hold pcsFile across file accesses.
  // (ie across SetFilePointer/ReadFile pairs)
  TBCAsyncRequest = class
  private
    FIO: TBCAsyncIo;
    FStream: TBCAsyncStream;
    FPos: LONGLONG;
    FAligned: Boolean;
    FLength: Integer;
    FBuffer: PByte;
    FContext: Pointer;
    FUser: DWord;
    Fhr: HResult;

  public
    // init the params for this request. Issue the i/o
    // if overlapped i/o is possible.
    function Request(AIO: TBCAsyncIo; AStream: TBCAsyncStream;
      APos: LONGLONG; ALength: Integer; AAligned: Boolean;
      ABuffer: PByte;
      // filter's context
      AContext: Pointer;
      // downstream filter's context
      AUser: DWord): HResult;

    // issue the i/o if not overlapped, and block until i/o complete.
    // returns error code of file i/o
    function Complete: HResult;

    // cancels the i/o. blocks until i/o is no longer pending
    function Cancel: HResult;

    // accessor functions
    function GetContext: Pointer;

    function GetUser: DWord;

    function GetHResult: HResult;

    // we set FLength to the actual length
    function GetActualLength: Integer;

    function GetStart: LONGLONG;
  end;

  TBCRequestList = class(TObjectList);

  // this class needs a worker thread, but the ones defined in classes\base
  // are not suitable (they assume you have one message sent or posted per
  // request, whereas here for efficiency we want just to set an event when
  // there is work on the queue).
  //
  // we create CAsyncRequest objects and queue them on m_listWork. The worker
  // thread pulls them off, completes them and puts them on m_listDone.
  // The events m_evWork and m_evDone are set when the corresponding lists are
  // not empty.
  //
  // Synchronous requests are done on the caller thread. These should be
  // synchronised by the caller, but to make sure we hold m_csFile across
  // the SetFilePointer/ReadFile code.
  //
  // Flush by calling BeginFlush. This rejects all further requests (by
  // setting m_bFlushing within m_csLists), cancels all requests and moves them
  // to the done list, and sets m_evDone to ensure that no WaitForNext operations
  // will block. Call EndFlush to cancel this state.
  //
  // we support unaligned calls to SyncRead. This is done by opening the file
  // twice if we are using unbuffered i/o (m_dwAlign > 1).
  // !!!fix this to buffer on top of existing file handle?
  TBCAsyncIo = class
  private
    FReader: TBCCritSec;
    FStream: TBCAsyncStream;

    // locks access to the list and events
    FCSLists: TBCCritSec;
    // true if between BeginFlush/EndFlush
    FFlushing: Boolean;

    FListWork: TBCRequestList;
    FListDone: TBCRequestList;

    // set when list is not empty
    FOnWork: TBCAMEvent;
    FOnDone: TBCAMEvent;

    // for correct flush behaviour: all protected by m_csLists
    // nr of items not on listDone or listWork
    FItemsOut: Integer;
    // TRUE if someone waiting for m_evAllDone
    FWaiting: Boolean;
    // signal when FItemsOut goes to 0 if FWaiting
    FOnAllDone: TBCAMEvent;

    // set when thread should exit
    FOnStop: TBCAMEvent;
    FThread: THandle;
    FThreadProc: TThreadProc;

    function Size: LONGLONG;

    // start the thread
    function StartThread: HResult;

    // stop the thread and close the handle
    function CloseThread: HResult;

    // manage the list of requests. hold m_csLists and ensure
    // that the (manual reset) event hevList is set when things on
    // the list but reset when the list is empty.
    // returns null if list empty
    function GetWorkItem: TBCAsyncRequest;

    // get an item from the done list
    function GetDoneItem: TBCAsyncRequest;

    // put an item on the work list
    function PutWorkItem(ARequest: TBCAsyncRequest): HResult;

    // put an item on the done list
    function PutDoneItem(ARequest: TBCAsyncRequest): HResult;

    // called on thread to process any active requests
    // ??? void ProcessRequests(void);
    procedure ProcessRequests;

    // initial static thread proc calls ThreadProc with DWORD
    // param as this
    function InitialThreadProc(pv: Pointer): DWord; stdcall;

    function ThreadProc: DWord; virtual;

  public

    constructor Create(AStream: TBCAsyncStream);
    destructor Destroy; override;

    // open the file
    function Open(AName: PChar): HResult; virtual;

    // ready for async activity - call this before
    // calling Request
    function AsyncActive: HResult;

    // call this when no more async activity will happen before
    // the next AsyncActive call
    function AsyncInactive: HResult;

    // queue a requested read. must be aligned.
    function Request(APos: LONGLONG; ALength: Integer;
      AAligned: Boolean; ABuffer: PByte;
      AContext: Pointer; AUser: DWord): HResult;

    // wait for the next read to complete
    function WaitForNext(ATimeout: DWord; AContext: PPointer;
      out AUser: DWord; out AActual: Integer): HResult;

    // perform a read of an already aligned buffer
    function SyncReadAligned(APos: LONGLONG; ALength: Integer;
      ABuffer: PByte; out AActual: Integer; AContext: Pointer): HResult;

    // perform a synchronous read. will be buffered
    // if not aligned.
    function SyncRead(APos: LONGLONG; ALength: Integer;
      ABuffer: PByte): HResult;

    // return length
    function Length(out ATotal: LONGLONG;
      out AAvailable: LONGLONG): HResult;

    // all Reader positions, read lengths and memory locations must
    // be aligned to this.
    function Alignment(out Al: Integer): HResult; overload;

    function BeginFlush: HResult;
    function EndFlush: HResult;

    function Alignment: Integer; overload;

    function IsAligned(Al: Integer): Boolean; overload;
    function IsAligned(Al: LONGLONG): Boolean; overload;

    //  Accessor
    function StopEvent: THandle;
  end;

implementation

// --- TBCAsyncStream ---

function TBCAsyncStream.Size: LONGLONG;
var
  Available: LONGLONG;
begin
  Result := Size(Available);
end;

// --- TBCAsyncRequest ---

function TBCAsyncRequest.Request(AIO: TBCAsyncIo; AStream: TBCAsyncStream;
  APos: LONGLONG; ALength: Integer; AAligned: Boolean;
  ABuffer: PByte; AContext: Pointer; AUser: DWord): HResult;
begin
  FIo     := AIo;
  FStream := AStream;
  FPos    := APos;
  FLength := ALength;
  FAligned:= AAligned;
  FBuffer := ABuffer;
  FContext:= AContext;
  FUser   := AUser;
  Fhr     := VFW_E_TIMEOUT;   // not done yet

  Result  := S_OK;
end;

function TBCAsyncRequest.Complete: HResult;
var
  Actual: DWord;
  Sample: IMediaSample;
begin
  FStream.Lock;
  try
    Fhr := FStream.SetPointer(FPos);
    if (S_OK = Fhr) then
    begin
      Fhr := FStream.Read(FBuffer, FLength, FAligned, Actual);
      if (Fhr = OLE_S_FIRST) then
      begin
        if Assigned(FContext) then
        begin
          Sample := IMediaSample(FContext);
          Sample.SetDiscontinuity(True);
          Fhr := S_OK;
        end;
      end;

      if (Failed(Fhr)) then
      else
        if (Actual <> DWord(FLength)) then
        begin
          // tell caller size changed - probably because of EOF
          FLength := Integer(Actual);
          Fhr := S_FALSE;
        end
        else
          Fhr := S_OK;
    end;

  finally
    FStream.Unlock;
    Result := Fhr;
  end;
end;

function TBCAsyncRequest.Cancel: HResult;
begin
  Result := S_OK;
end;

function TBCAsyncRequest.GetContext: Pointer;
begin
  Result := FContext;
end;

function TBCAsyncRequest.GetUser: DWord;
begin
  Result := FUser;
end;

function TBCAsyncRequest.GetHResult: HResult;
begin
  Result := Fhr;
end;

function TBCAsyncRequest.GetActualLength: Integer;
begin
  Result := FLength;
end;

function TBCAsyncRequest.GetStart: LONGLONG;
begin
  Result := FPos;
end;

// --- TBCAsyncIo ---

constructor TBCAsyncIo.Create(AStream: TBCAsyncStream);
begin
  FReader := TBCCritSec.Create;
  FStream := AStream;

  FCSLists := TBCCritSec.Create;
  FFlushing := False;

  FListWork := TBCRequestList.Create;
  FListWork.OwnsObjects := False;
  FListDone := TBCRequestList.Create;
  FListDone.OwnsObjects := False;

  FOnWork := TBCAMEvent.Create(True);
  FOnDone := TBCAMEvent.Create(True);
  FOnAllDone := TBCAMEvent.Create(True);
  FOnStop := TBCAMEvent.Create(True);

  FItemsOut := 0;
  FWaiting := False;

  // set when thread should exit
  FThread := 0;
  FThreadProc := nil;
end;

destructor TBCAsyncIo.Destroy;
begin
  // move everything to the done list
  BeginFlush();

  // shutdown worker thread
  CloseThread();

  // empty the done list
  FListDone.Clear;
  FListDone.Free;
  FListDone := nil;
end;

function TBCAsyncIo.Open(AName: PChar): HResult;
begin
  Result := NOERROR;
end;

function TBCAsyncIo.Size: LONGLONG;
begin
  Assert(Assigned(FStream));
  Result := FStream.Size;
end;

function TBCAsyncIo.StartThread: HResult;
var
  dwThreadID, dwErr: DWord;
begin
  if (FThread <> 0) then
  begin
    Result := S_OK;
    Exit;
  end;

  // clear the stop event before starting
  FOnStop.Reset;

  FThread := CreateThread(nil, 0, @TBCAsyncIo.InitialThreadProc,
    Self, 0, dwThreadID);
  if (FThread = 0) then
  begin
    dwErr := GetLastError;
    Result := HResultFromWin32(dwErr);
    Exit;
  end;

  Result := S_OK;
end;

function TBCAsyncIo.CloseThread: HResult;
begin
  // signal the thread-exit object
  FOnStop.SetEv;

  if (FThread <> 0) then
  begin
    WaitForSingleObject(FThread, INFINITE);
    CloseHandle(FThread);
    FThread := 0;
  end;

  Result := S_OK;
end;

function TBCAsyncIo.GetWorkItem: TBCAsyncRequest;
var
  Req: TBCAsyncRequest;
begin
  Result := nil;
  Req := nil;

  FCSLists.Lock;
  with FListWork do
  try
    if (Count <> 0) then
    begin
      Req := TBCAsyncRequest(Items[0]);
      Delete(0);
    end;

    // force event set correctly
    if (Count = 0) then
      FOnWork.Reset;

    Result := Req;
  finally
    FCSLists.UnLock;
  end;
end;

function TBCAsyncIo.GetDoneItem: TBCAsyncRequest;
var
  Req: TBCAsyncRequest;
begin
  Result := nil;
  Req := nil;

  FCSLists.Lock;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -