⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 uasyncio.pas

📁 delphi源码
💻 PAS
📖 第 1 页 / 共 2 页
字号:
  with FListDone do
  try
    if (Count <> 0) then
    begin
      Req := TBCAsyncRequest(Items[0]);
      Delete(0);
    end;

    Result := Req;
    // force event set correctly if list now empty
    // or we're in the final stages of flushing
    // Note that during flushing the way it's supposed to work is that
    // everything is shoved on the Done list then the application is
    // supposed to pull until it gets nothing more
    //
    // Thus we should not set m_evDone unconditionally until everything
    // has moved to the done list which means we must wait until
    // cItemsOut is 0 (which is guaranteed by m_bWaiting being TRUE).
    if (Count = 0) and ((Not FFlushing) or FWaiting) then
      FOnDone.Reset;
  finally
    FCSLists.UnLock;
  end;
end;

function TBCAsyncIo.PutWorkItem(ARequest: TBCAsyncRequest): HResult;
begin
  FCSLists.Lock;
  try
    if (FFlushing) then
      Result := VFW_E_WRONG_STATE
    else
      try
        FListWork.Add(ARequest);
        // event should now be in a set state - force this
        FOnWork.SetEv;

        // start the thread now if not already started
        Result := StartThread;
      except
        Result := E_OUTOFMEMORY;
      end;
  finally
    FCSLists.UnLock;
  end;
end;

function TBCAsyncIo.PutDoneItem(ARequest: TBCAsyncRequest): HResult;
begin
  // put an item on the done list - ok to do this when
  // flushing
  Assert(FCSLists.CritCheckIn);

  try
    FListDone.Add(ARequest);
    // event should now be in a set state - force this
    FOnDone.SetEv;
    Result := S_OK;
  except
    Result := E_OUTOFMEMORY;
  end;
end;

procedure TBCAsyncIo.ProcessRequests;
var
  Req: TBCAsyncRequest;
begin
  // lock to get the item and increment the outstanding count

  repeat
    FCSLists.Lock;
    try
      Req := GetWorkItem;
      if (Req = nil) then
        // done
        Exit;
      // one more item not on the done or work list
      Inc(FItemsOut);
    finally
      FCSLists.UnLock;
    end;

    Req.Complete;

    // regain critsec to replace on done list
    FCSLists.Lock;
    try
      PutDoneItem(Req);
      Dec(FItemsOut);
      if (FItemsOut = 0) then
        if (FWaiting) then
          FOnAllDone.SetEv;
    finally
      FCSLists.UnLock;
    end;
  until False;
end;

function TBCAsyncIo.InitialThreadProc(pv: Pointer): DWord;
begin
  Result := ThreadProc;
end;

function TBCAsyncIo.ThreadProc: DWord;
const
  EvCount = 2;
var
  Events: array[0..EvCount - 1] of THandle;
begin
  // the thread proc - assumes that DWORD thread param is the
  // Self pointer
  Events[0] := FOnStop.Handle;
  Events[1] := FOnWork.Handle;

  repeat
    case WaitForMultipleObjects(2, @Events, False, Infinite) of
    WAIT_OBJECT_0+1:
      // requests need processing
      ProcessRequests;
    else
      begin
        // any error or stop event - we should exit
        Result := 0;
        Exit;
      end;
    end;
  until False;
end;

function TBCAsyncIo.AsyncActive: HResult;
begin
  Result := StartThread;
end;

function TBCAsyncIo.AsyncInactive: HResult;
begin
  Result := CloseThread;
end;

function TBCAsyncIo.Request(APos: LONGLONG; ALength: Integer;
  AAligned: Boolean; ABuffer: PByte;
  AContext: Pointer; AUser: DWord): HResult;
var
  Request: TBCAsyncRequest;
begin
  if AAligned then
  begin
    if (Not IsAligned(APos)) or (Not IsAligned(ALength)) or
      (Not IsAligned(Integer(ABuffer))) then
    begin
      Result := VFW_E_BADALIGN;
      Exit;
    end;
  end;

  try
    Request := TBCAsyncRequest.Create;
  except
    Result := E_OUTOFMEMORY;
    Exit;
  end;

  Result := Request.Request(Self, FStream, APos, ALength,
    AAligned, ABuffer, AContext, AUser);
  if (Succeeded(Result)) then
  begin
    // might fail if flushing
    Result := PutWorkItem(Request);
  end;
  if Failed(Result) then
    Request.Free;
end;

function TBCAsyncIo.WaitForNext(ATimeout: DWord; AContext: PPointer;
  out AUser: DWord; out AActual: Integer): HResult;
var
  Request: TBCAsyncRequest;
  hr: HResult;
begin
  if (AContext = nil) then
  begin
    Result := E_POINTER;
    Exit;
  end;

  // some errors find a sample, others don't. Ensure that
  // *ppContext is NULL if no sample found
  AContext^ := nil;

  // wait until the event is set, but since we are not
  // holding the critsec when waiting, we may need to re-wait
  repeat
    if (Not FOnDone.Wait(ATimeout)) then
    begin
      // timeout occurred
      Result := VFW_E_TIMEOUT;
      Exit;
    end;

    // get next event from list
    Request := GetDoneItem;
    if Assigned(Request) then
    begin
      // found a completed request

      // check if ok
      hr := Request.GetHResult;
      if (hr = S_FALSE) then
      begin
        // this means the actual length was less than
        // requested - may be ok if he aligned the end of file
        if ((Request.GetActualLength +
          Request.GetStart) = Size) then
          hr := S_OK
        else
          // it was an actual read error
          hr := E_FAIL;
     end;

      // return actual bytes read
      AActual := Request.GetActualLength;

      // return his context
      AContext^ := Request.GetContext;
      AUser := Request.GetUser;

      Request.Free;
      Result := hr;
      Exit;
    end
    else
      try
        //  Hold the critical section while checking the list state
        FCSLists.Lock;
        if (FFlushing and (Not FWaiting)) then
        begin
          // can't block as we are between BeginFlush and EndFlush

          // but note that if m_bWaiting is set, then there are some
          // items not yet complete that we should block for.

          Result := VFW_E_WRONG_STATE;
          Exit;
        end;
      finally
        FCSLists.UnLock;
      end;
    // done item was grabbed between completion and
    // us locking m_csLists.
  until False;
end;

function TBCAsyncIo.SyncReadAligned(APos: LONGLONG; ALength: Integer;
  ABuffer: PByte; out AActual: Integer; AContext: Pointer): HResult;
var
  Request: TBCAsyncRequest;
begin
  if (Not IsAligned(APos)) or (Not IsAligned(ALength)) or
    (Not IsAligned(Integer(ABuffer))) then
  begin
    Result := VFW_E_BADALIGN;
    Exit;
  end;

  try
    Request := TBCAsyncRequest.Create;
  except
    Result := E_OUTOFMEMORY;
    Exit;
  end;

  Result := Request.Request(Self, FStream, APos, ALength,
    True, ABuffer, AContext, 0);
  if Failed(Result) then
    Exit;

  Result := Request.Complete;

  // return actual data length
  AActual := Request.GetActualLength;

  Request.Free;
end;

function TBCAsyncIo.SyncRead(APos: LONGLONG; ALength: Integer;
  ABuffer: PByte): HResult;
var
  Unused: Integer;
  Req: TBCAsyncRequest;
begin
  // perform a synchronous read request on this thread.
  // may not be aligned - so we will have to buffer.
  if (IsAligned(APos) and IsAligned(ALength) and IsAligned(Integer(ABuffer))) then
  begin
    Result := SyncReadAligned(APos, ALength, ABuffer, Unused, nil);
    Exit;
  end;

  // not aligned with requirements - use buffered file handle.
  //!!! might want to fix this to buffer the data ourselves?
  Req := TBCAsyncRequest.Create;
  Result := Req.Request(Self, FStream, APos, ALength, False, ABuffer, nil, 0);
  if Failed(Result) then
    Exit;

  Result := Req.Complete;
  Req.Free;
end;

function TBCAsyncIo.Length(out ATotal: LONGLONG;
  out AAvailable: LONGLONG): HResult;
begin
  ATotal := FStream.Size(AAvailable);
  Result := S_OK;
end;

function TBCAsyncIo.Alignment(out Al: Integer): HResult;
begin
  Al := Alignment;
  Result := S_OK;
end;

// cancel all items on the worklist onto the done list
// and refuse further requests or further WaitForNext calls
// until the end flush
//
// WaitForNext must return with NULL only if there are no successful requests.
// So Flush does the following:
// 1. set m_bFlushing ensures no more requests succeed
// 2. move all items from work list to the done list.
// 3. If there are any outstanding requests, then we need to release the
//    critsec to allow them to complete. The m_bWaiting as well as ensuring
//    that we are signalled when they are all done is also used to indicate
//    to WaitForNext that it should continue to block.
// 4. Once all outstanding requests are complete, we force m_evDone set and
//    m_bFlushing set and m_bWaiting false. This ensures that WaitForNext will
//    not block when the done list is empty.
function TBCAsyncIo.BeginFlush: HResult;
var
  Req: TBCAsyncRequest;
begin
  // hold the lock while emptying the work list
  FCSLists.Lock;
  try
    // prevent further requests being queued.
    // Also WaitForNext will refuse to block if this is set
    // unless m_bWaiting is also set which it will be when we release
    // the critsec if there are any outstanding).
    FFlushing := True;

    repeat
      Req := GetWorkItem;
      if Not Assigned(Req) then
        Break;

      Req.Cancel;
      PutDoneItem(Req);
    until False;

    // now wait for any outstanding requests to complete
    if (FItemsOut > 0) then
      begin
          // can be only one person waiting
          Assert(Not FWaiting);

          // this tells the completion routine that we need to be
          // signalled via m_evAllDone when all outstanding items are
          // done. It also tells WaitForNext to continue blocking.
          FWaiting := True;
      end
    else
      begin
        // all done

        // force m_evDone set so that even if list is empty,
        // WaitForNext will not block
        // don't do this until we are sure that all
        // requests are on the done list.
        FOnDone.SetEv;
        Result := S_OK;
        Exit;
      end;
  finally
    FCSLists.UnLock;
  end;

  Assert(FWaiting);

  // wait without holding critsec
  repeat
    FOnAllDone.Wait;

    // hold critsec to check
    FCSLists.Lock;
    try
      if (FItemsOut = 0) then
      begin
        // now we are sure that all outstanding requests are on
        // the done list and no more will be accepted
        FWaiting := False;

        // force m_evDone set so that even if list is empty,
        // WaitForNext will not block
        // don't do this until we are sure that all
        // requests are on the done list.
        FOnDone.SetEv;

        Result := S_OK;
        Exit;
      end;
    finally
      FCSLists.UnLock;
    end;
  until False;
end;

function TBCAsyncIo.EndFlush: HResult;
begin
  FCSLists.Lock;
  try
    FFlushing := False;

    Assert(Not FWaiting);

    // m_evDone might have been set by BeginFlush - ensure it is
    // set IFF m_listDone is non-empty
    if (FListDone.Count > 0) then
      FOnDone.SetEv
    else
      FOnDone.Reset;

    Result := S_OK;
  finally
    FCSLists.UnLock;
  end;
end;

function TBCAsyncIo.Alignment: Integer;
begin
  Result := FStream.Alignment;
end;

function TBCAsyncIo.IsAligned(Al: Integer): Boolean;
begin
  Result := ((Al and (Alignment - 1)) = 0);
end;

function TBCAsyncIo.IsAligned(Al: LONGLONG): Boolean;
begin
  Result := IsAligned(Integer(Al and $FFFFFFFF));
end;

function TBCAsyncIo.StopEvent: THandle;
begin
  Result := FOnDone.Handle;
end;

end.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -