📄 uasyncio.pas
字号:
//------------------------------------------------------------------------------
// File: UAsyncIO.pas
// Original files: asyncio.h, asyncio.c
//
// Desc: Base library with I/O functionality.
//
// Portions created by Microsoft are
// Copyright (c) 2000-2002 Microsoft Corporation. All rights reserved.
//------------------------------------------------------------------------------
unit UAsyncIo;
interface
uses
Windows, Contnrs, BaseClass, DirectShow9, DSUtil;
type
//
// definition of CAsyncFile object that performs file access. It provides
// asynchronous, unbuffered, aligned reads from a file, using a worker thread
// on win95 and potentially overlapped i/o if available.
// !!! Need to use real overlapped i/o if available
// currently only uses worker thread, not overlapped i/o
TBCAsyncIo = class;
TBCAsyncStream = class;
LONGLONG = Int64;
PLONGLONG = ^LONGLONG;
//
// Model the stream we read from based on a file-like interface
//
TBCAsyncStream = class
public
function SetPointer(APos: LONGLONG): HResult; virtual; abstract;
function Read(ABuffer: PByte; ABytesToRead: DWord;
AAlign: Boolean; out ABytesRead: DWord): HResult; virtual; abstract;
function Size(out AAvailable: LONGLONG): LONGLONG; overload; virtual; abstract;
function Size: LONGLONG; overload; virtual;
function Alignment: DWord; virtual; abstract;
procedure Lock; virtual; abstract;
procedure Unlock; virtual; abstract;
//procedure SetStopHandle(hevStop: THandle); virtual
end;
// represents a single request and performs the i/o. Can be called on either
// worker thread or app thread, but must hold pcsFile across file accesses.
// (ie across SetFilePointer/ReadFile pairs)
TBCAsyncRequest = class
private
FIO: TBCAsyncIo;
FStream: TBCAsyncStream;
FPos: LONGLONG;
FAligned: Boolean;
FLength: Integer;
FBuffer: PByte;
FContext: Pointer;
FUser: DWord;
Fhr: HResult;
public
// init the params for this request. Issue the i/o
// if overlapped i/o is possible.
function Request(AIO: TBCAsyncIo; AStream: TBCAsyncStream;
APos: LONGLONG; ALength: Integer; AAligned: Boolean;
ABuffer: PByte;
// filter's context
AContext: Pointer;
// downstream filter's context
AUser: DWord): HResult;
// issue the i/o if not overlapped, and block until i/o complete.
// returns error code of file i/o
function Complete: HResult;
// cancels the i/o. blocks until i/o is no longer pending
function Cancel: HResult;
// accessor functions
function GetContext: Pointer;
function GetUser: DWord;
function GetHResult: HResult;
// we set FLength to the actual length
function GetActualLength: Integer;
function GetStart: LONGLONG;
end;
TBCRequestList = class(TObjectList);
// this class needs a worker thread, but the ones defined in classes\base
// are not suitable (they assume you have one message sent or posted per
// request, whereas here for efficiency we want just to set an event when
// there is work on the queue).
//
// we create CAsyncRequest objects and queue them on m_listWork. The worker
// thread pulls them off, completes them and puts them on m_listDone.
// The events m_evWork and m_evDone are set when the corresponding lists are
// not empty.
//
// Synchronous requests are done on the caller thread. These should be
// synchronised by the caller, but to make sure we hold m_csFile across
// the SetFilePointer/ReadFile code.
//
// Flush by calling BeginFlush. This rejects all further requests (by
// setting m_bFlushing within m_csLists), cancels all requests and moves them
// to the done list, and sets m_evDone to ensure that no WaitForNext operations
// will block. Call EndFlush to cancel this state.
//
// we support unaligned calls to SyncRead. This is done by opening the file
// twice if we are using unbuffered i/o (m_dwAlign > 1).
// !!!fix this to buffer on top of existing file handle?
TBCAsyncIo = class
private
FReader: TBCCritSec;
FStream: TBCAsyncStream;
// locks access to the list and events
FCSLists: TBCCritSec;
// true if between BeginFlush/EndFlush
FFlushing: Boolean;
FListWork: TBCRequestList;
FListDone: TBCRequestList;
// set when list is not empty
FOnWork: TBCAMEvent;
FOnDone: TBCAMEvent;
// for correct flush behaviour: all protected by m_csLists
// nr of items not on listDone or listWork
FItemsOut: Integer;
// TRUE if someone waiting for m_evAllDone
FWaiting: Boolean;
// signal when FItemsOut goes to 0 if FWaiting
FOnAllDone: TBCAMEvent;
// set when thread should exit
FOnStop: TBCAMEvent;
FThread: THandle;
FThreadProc: TThreadProc;
function Size: LONGLONG;
// start the thread
function StartThread: HResult;
// stop the thread and close the handle
function CloseThread: HResult;
// manage the list of requests. hold m_csLists and ensure
// that the (manual reset) event hevList is set when things on
// the list but reset when the list is empty.
// returns null if list empty
function GetWorkItem: TBCAsyncRequest;
// get an item from the done list
function GetDoneItem: TBCAsyncRequest;
// put an item on the work list
function PutWorkItem(ARequest: TBCAsyncRequest): HResult;
// put an item on the done list
function PutDoneItem(ARequest: TBCAsyncRequest): HResult;
// called on thread to process any active requests
// ??? void ProcessRequests(void);
procedure ProcessRequests;
// initial static thread proc calls ThreadProc with DWORD
// param as this
function InitialThreadProc(pv: Pointer): DWord; stdcall;
function ThreadProc: DWord; virtual;
public
constructor Create(AStream: TBCAsyncStream);
destructor Destroy; override;
// open the file
function Open(AName: PChar): HResult; virtual;
// ready for async activity - call this before
// calling Request
function AsyncActive: HResult;
// call this when no more async activity will happen before
// the next AsyncActive call
function AsyncInactive: HResult;
// queue a requested read. must be aligned.
function Request(APos: LONGLONG; ALength: Integer;
AAligned: Boolean; ABuffer: PByte;
AContext: Pointer; AUser: DWord): HResult;
// wait for the next read to complete
function WaitForNext(ATimeout: DWord; AContext: PPointer;
out AUser: DWord; out AActual: Integer): HResult;
// perform a read of an already aligned buffer
function SyncReadAligned(APos: LONGLONG; ALength: Integer;
ABuffer: PByte; out AActual: Integer; AContext: Pointer): HResult;
// perform a synchronous read. will be buffered
// if not aligned.
function SyncRead(APos: LONGLONG; ALength: Integer;
ABuffer: PByte): HResult;
// return length
function Length(out ATotal: LONGLONG;
out AAvailable: LONGLONG): HResult;
// all Reader positions, read lengths and memory locations must
// be aligned to this.
function Alignment(out Al: Integer): HResult; overload;
function BeginFlush: HResult;
function EndFlush: HResult;
function Alignment: Integer; overload;
function IsAligned(Al: Integer): Boolean; overload;
function IsAligned(Al: LONGLONG): Boolean; overload;
// Accessor
function StopEvent: THandle;
end;
implementation
// --- TBCAsyncStream ---
function TBCAsyncStream.Size: LONGLONG;
var
Available: LONGLONG;
begin
Result := Size(Available);
end;
// --- TBCAsyncRequest ---
function TBCAsyncRequest.Request(AIO: TBCAsyncIo; AStream: TBCAsyncStream;
APos: LONGLONG; ALength: Integer; AAligned: Boolean;
ABuffer: PByte; AContext: Pointer; AUser: DWord): HResult;
begin
FIo := AIo;
FStream := AStream;
FPos := APos;
FLength := ALength;
FAligned:= AAligned;
FBuffer := ABuffer;
FContext:= AContext;
FUser := AUser;
Fhr := VFW_E_TIMEOUT; // not done yet
Result := S_OK;
end;
function TBCAsyncRequest.Complete: HResult;
var
Actual: DWord;
Sample: IMediaSample;
begin
FStream.Lock;
try
Fhr := FStream.SetPointer(FPos);
if (S_OK = Fhr) then
begin
Fhr := FStream.Read(FBuffer, FLength, FAligned, Actual);
if (Fhr = OLE_S_FIRST) then
begin
if Assigned(FContext) then
begin
Sample := IMediaSample(FContext);
Sample.SetDiscontinuity(True);
Fhr := S_OK;
end;
end;
if (Failed(Fhr)) then
else
if (Actual <> DWord(FLength)) then
begin
// tell caller size changed - probably because of EOF
FLength := Integer(Actual);
Fhr := S_FALSE;
end
else
Fhr := S_OK;
end;
finally
FStream.Unlock;
Result := Fhr;
end;
end;
function TBCAsyncRequest.Cancel: HResult;
begin
Result := S_OK;
end;
function TBCAsyncRequest.GetContext: Pointer;
begin
Result := FContext;
end;
function TBCAsyncRequest.GetUser: DWord;
begin
Result := FUser;
end;
function TBCAsyncRequest.GetHResult: HResult;
begin
Result := Fhr;
end;
function TBCAsyncRequest.GetActualLength: Integer;
begin
Result := FLength;
end;
function TBCAsyncRequest.GetStart: LONGLONG;
begin
Result := FPos;
end;
// --- TBCAsyncIo ---
constructor TBCAsyncIo.Create(AStream: TBCAsyncStream);
begin
FReader := TBCCritSec.Create;
FStream := AStream;
FCSLists := TBCCritSec.Create;
FFlushing := False;
FListWork := TBCRequestList.Create;
FListWork.OwnsObjects := False;
FListDone := TBCRequestList.Create;
FListDone.OwnsObjects := False;
FOnWork := TBCAMEvent.Create(True);
FOnDone := TBCAMEvent.Create(True);
FOnAllDone := TBCAMEvent.Create(True);
FOnStop := TBCAMEvent.Create(True);
FItemsOut := 0;
FWaiting := False;
// set when thread should exit
FThread := 0;
FThreadProc := nil;
end;
destructor TBCAsyncIo.Destroy;
begin
// move everything to the done list
BeginFlush();
// shutdown worker thread
CloseThread();
// empty the done list
FListDone.Clear;
FListDone.Free;
FListDone := nil;
end;
function TBCAsyncIo.Open(AName: PChar): HResult;
begin
Result := NOERROR;
end;
function TBCAsyncIo.Size: LONGLONG;
begin
Assert(Assigned(FStream));
Result := FStream.Size;
end;
function TBCAsyncIo.StartThread: HResult;
var
dwThreadID, dwErr: DWord;
begin
if (FThread <> 0) then
begin
Result := S_OK;
Exit;
end;
// clear the stop event before starting
FOnStop.Reset;
FThread := CreateThread(nil, 0, @TBCAsyncIo.InitialThreadProc,
Self, 0, dwThreadID);
if (FThread = 0) then
begin
dwErr := GetLastError;
Result := HResultFromWin32(dwErr);
Exit;
end;
Result := S_OK;
end;
function TBCAsyncIo.CloseThread: HResult;
begin
// signal the thread-exit object
FOnStop.SetEv;
if (FThread <> 0) then
begin
WaitForSingleObject(FThread, INFINITE);
CloseHandle(FThread);
FThread := 0;
end;
Result := S_OK;
end;
function TBCAsyncIo.GetWorkItem: TBCAsyncRequest;
var
Req: TBCAsyncRequest;
begin
Result := nil;
Req := nil;
FCSLists.Lock;
with FListWork do
try
if (Count <> 0) then
begin
Req := TBCAsyncRequest(Items[0]);
Delete(0);
end;
// force event set correctly
if (Count = 0) then
FOnWork.Reset;
Result := Req;
finally
FCSLists.UnLock;
end;
end;
function TBCAsyncIo.GetDoneItem: TBCAsyncRequest;
var
Req: TBCAsyncRequest;
begin
Result := nil;
Req := nil;
FCSLists.Lock;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -