📄 uasyncio.pas
字号:
with FListDone do
try
if (Count <> 0) then
begin
Req := TBCAsyncRequest(Items[0]);
Delete(0);
end;
Result := Req;
// force event set correctly if list now empty
// or we're in the final stages of flushing
// Note that during flushing the way it's supposed to work is that
// everything is shoved on the Done list then the application is
// supposed to pull until it gets nothing more
//
// Thus we should not set m_evDone unconditionally until everything
// has moved to the done list which means we must wait until
// cItemsOut is 0 (which is guaranteed by m_bWaiting being TRUE).
if (Count = 0) and ((Not FFlushing) or FWaiting) then
FOnDone.Reset;
finally
FCSLists.UnLock;
end;
end;
function TBCAsyncIo.PutWorkItem(ARequest: TBCAsyncRequest): HResult;
begin
FCSLists.Lock;
try
if (FFlushing) then
Result := VFW_E_WRONG_STATE
else
try
FListWork.Add(ARequest);
// event should now be in a set state - force this
FOnWork.SetEv;
// start the thread now if not already started
Result := StartThread;
except
Result := E_OUTOFMEMORY;
end;
finally
FCSLists.UnLock;
end;
end;
function TBCAsyncIo.PutDoneItem(ARequest: TBCAsyncRequest): HResult;
begin
// put an item on the done list - ok to do this when
// flushing
Assert(FCSLists.CritCheckIn);
try
FListDone.Add(ARequest);
// event should now be in a set state - force this
FOnDone.SetEv;
Result := S_OK;
except
Result := E_OUTOFMEMORY;
end;
end;
procedure TBCAsyncIo.ProcessRequests;
var
Req: TBCAsyncRequest;
begin
// lock to get the item and increment the outstanding count
repeat
FCSLists.Lock;
try
Req := GetWorkItem;
if (Req = nil) then
// done
Exit;
// one more item not on the done or work list
Inc(FItemsOut);
finally
FCSLists.UnLock;
end;
Req.Complete;
// regain critsec to replace on done list
FCSLists.Lock;
try
PutDoneItem(Req);
Dec(FItemsOut);
if (FItemsOut = 0) then
if (FWaiting) then
FOnAllDone.SetEv;
finally
FCSLists.UnLock;
end;
until False;
end;
function TBCAsyncIo.InitialThreadProc(pv: Pointer): DWord;
begin
Result := ThreadProc;
end;
function TBCAsyncIo.ThreadProc: DWord;
const
EvCount = 2;
var
Events: array[0..EvCount - 1] of THandle;
begin
// the thread proc - assumes that DWORD thread param is the
// Self pointer
Events[0] := FOnStop.Handle;
Events[1] := FOnWork.Handle;
repeat
case WaitForMultipleObjects(2, @Events, False, Infinite) of
WAIT_OBJECT_0+1:
// requests need processing
ProcessRequests;
else
begin
// any error or stop event - we should exit
Result := 0;
Exit;
end;
end;
until False;
end;
function TBCAsyncIo.AsyncActive: HResult;
begin
Result := StartThread;
end;
function TBCAsyncIo.AsyncInactive: HResult;
begin
Result := CloseThread;
end;
function TBCAsyncIo.Request(APos: LONGLONG; ALength: Integer;
AAligned: Boolean; ABuffer: PByte;
AContext: Pointer; AUser: DWord): HResult;
var
Request: TBCAsyncRequest;
begin
if AAligned then
begin
if (Not IsAligned(APos)) or (Not IsAligned(ALength)) or
(Not IsAligned(Integer(ABuffer))) then
begin
Result := VFW_E_BADALIGN;
Exit;
end;
end;
try
Request := TBCAsyncRequest.Create;
except
Result := E_OUTOFMEMORY;
Exit;
end;
Result := Request.Request(Self, FStream, APos, ALength,
AAligned, ABuffer, AContext, AUser);
if (Succeeded(Result)) then
begin
// might fail if flushing
Result := PutWorkItem(Request);
end;
if Failed(Result) then
Request.Free;
end;
function TBCAsyncIo.WaitForNext(ATimeout: DWord; AContext: PPointer;
out AUser: DWord; out AActual: Integer): HResult;
var
Request: TBCAsyncRequest;
hr: HResult;
begin
if (AContext = nil) then
begin
Result := E_POINTER;
Exit;
end;
// some errors find a sample, others don't. Ensure that
// *ppContext is NULL if no sample found
AContext^ := nil;
// wait until the event is set, but since we are not
// holding the critsec when waiting, we may need to re-wait
repeat
if (Not FOnDone.Wait(ATimeout)) then
begin
// timeout occurred
Result := VFW_E_TIMEOUT;
Exit;
end;
// get next event from list
Request := GetDoneItem;
if Assigned(Request) then
begin
// found a completed request
// check if ok
hr := Request.GetHResult;
if (hr = S_FALSE) then
begin
// this means the actual length was less than
// requested - may be ok if he aligned the end of file
if ((Request.GetActualLength +
Request.GetStart) = Size) then
hr := S_OK
else
// it was an actual read error
hr := E_FAIL;
end;
// return actual bytes read
AActual := Request.GetActualLength;
// return his context
AContext^ := Request.GetContext;
AUser := Request.GetUser;
Request.Free;
Result := hr;
Exit;
end
else
try
// Hold the critical section while checking the list state
FCSLists.Lock;
if (FFlushing and (Not FWaiting)) then
begin
// can't block as we are between BeginFlush and EndFlush
// but note that if m_bWaiting is set, then there are some
// items not yet complete that we should block for.
Result := VFW_E_WRONG_STATE;
Exit;
end;
finally
FCSLists.UnLock;
end;
// done item was grabbed between completion and
// us locking m_csLists.
until False;
end;
function TBCAsyncIo.SyncReadAligned(APos: LONGLONG; ALength: Integer;
ABuffer: PByte; out AActual: Integer; AContext: Pointer): HResult;
var
Request: TBCAsyncRequest;
begin
if (Not IsAligned(APos)) or (Not IsAligned(ALength)) or
(Not IsAligned(Integer(ABuffer))) then
begin
Result := VFW_E_BADALIGN;
Exit;
end;
try
Request := TBCAsyncRequest.Create;
except
Result := E_OUTOFMEMORY;
Exit;
end;
Result := Request.Request(Self, FStream, APos, ALength,
True, ABuffer, AContext, 0);
if Failed(Result) then
Exit;
Result := Request.Complete;
// return actual data length
AActual := Request.GetActualLength;
Request.Free;
end;
function TBCAsyncIo.SyncRead(APos: LONGLONG; ALength: Integer;
ABuffer: PByte): HResult;
var
Unused: Integer;
Req: TBCAsyncRequest;
begin
// perform a synchronous read request on this thread.
// may not be aligned - so we will have to buffer.
if (IsAligned(APos) and IsAligned(ALength) and IsAligned(Integer(ABuffer))) then
begin
Result := SyncReadAligned(APos, ALength, ABuffer, Unused, nil);
Exit;
end;
// not aligned with requirements - use buffered file handle.
//!!! might want to fix this to buffer the data ourselves?
Req := TBCAsyncRequest.Create;
Result := Req.Request(Self, FStream, APos, ALength, False, ABuffer, nil, 0);
if Failed(Result) then
Exit;
Result := Req.Complete;
Req.Free;
end;
function TBCAsyncIo.Length(out ATotal: LONGLONG;
out AAvailable: LONGLONG): HResult;
begin
ATotal := FStream.Size(AAvailable);
Result := S_OK;
end;
function TBCAsyncIo.Alignment(out Al: Integer): HResult;
begin
Al := Alignment;
Result := S_OK;
end;
// cancel all items on the worklist onto the done list
// and refuse further requests or further WaitForNext calls
// until the end flush
//
// WaitForNext must return with NULL only if there are no successful requests.
// So Flush does the following:
// 1. set m_bFlushing ensures no more requests succeed
// 2. move all items from work list to the done list.
// 3. If there are any outstanding requests, then we need to release the
// critsec to allow them to complete. The m_bWaiting as well as ensuring
// that we are signalled when they are all done is also used to indicate
// to WaitForNext that it should continue to block.
// 4. Once all outstanding requests are complete, we force m_evDone set and
// m_bFlushing set and m_bWaiting false. This ensures that WaitForNext will
// not block when the done list is empty.
function TBCAsyncIo.BeginFlush: HResult;
var
Req: TBCAsyncRequest;
begin
// hold the lock while emptying the work list
FCSLists.Lock;
try
// prevent further requests being queued.
// Also WaitForNext will refuse to block if this is set
// unless m_bWaiting is also set which it will be when we release
// the critsec if there are any outstanding).
FFlushing := True;
repeat
Req := GetWorkItem;
if Not Assigned(Req) then
Break;
Req.Cancel;
PutDoneItem(Req);
until False;
// now wait for any outstanding requests to complete
if (FItemsOut > 0) then
begin
// can be only one person waiting
Assert(Not FWaiting);
// this tells the completion routine that we need to be
// signalled via m_evAllDone when all outstanding items are
// done. It also tells WaitForNext to continue blocking.
FWaiting := True;
end
else
begin
// all done
// force m_evDone set so that even if list is empty,
// WaitForNext will not block
// don't do this until we are sure that all
// requests are on the done list.
FOnDone.SetEv;
Result := S_OK;
Exit;
end;
finally
FCSLists.UnLock;
end;
Assert(FWaiting);
// wait without holding critsec
repeat
FOnAllDone.Wait;
// hold critsec to check
FCSLists.Lock;
try
if (FItemsOut = 0) then
begin
// now we are sure that all outstanding requests are on
// the done list and no more will be accepted
FWaiting := False;
// force m_evDone set so that even if list is empty,
// WaitForNext will not block
// don't do this until we are sure that all
// requests are on the done list.
FOnDone.SetEv;
Result := S_OK;
Exit;
end;
finally
FCSLists.UnLock;
end;
until False;
end;
function TBCAsyncIo.EndFlush: HResult;
begin
FCSLists.Lock;
try
FFlushing := False;
Assert(Not FWaiting);
// m_evDone might have been set by BeginFlush - ensure it is
// set IFF m_listDone is non-empty
if (FListDone.Count > 0) then
FOnDone.SetEv
else
FOnDone.Reset;
Result := S_OK;
finally
FCSLists.UnLock;
end;
end;
function TBCAsyncIo.Alignment: Integer;
begin
Result := FStream.Alignment;
end;
function TBCAsyncIo.IsAligned(Al: Integer): Boolean;
begin
Result := ((Al and (Alignment - 1)) = 0);
end;
function TBCAsyncIo.IsAligned(Al: LONGLONG): Boolean;
begin
Result := IsAligned(Integer(Al and $FFFFFFFF));
end;
function TBCAsyncIo.StopEvent: THandle;
begin
Result := FOnDone.Handle;
end;
end.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -