⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 idiohandlerchain.pas

📁 photo.163.com 相册下载器 多线程下载
💻 PAS
📖 第 1 页 / 共 2 页
字号:
begin
  Result := 0;
  EIdException.Toss('Fall through error in ' + ClassName); {do not localize}
end;

procedure TIdIOHandlerChain.ReadStream(AStream: TIdStreamVCL; AByteCount: Integer;
      AReadUntilDisconnect: Boolean);
begin
  if AReadUntilDisconnect then begin
    QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(AStream.VCLStream), -1
     , True, False);
  end else begin
    QueueAndWait(TIdWorkOpUnitReadSizedStream.Create(AStream.VCLStream, AByteCount));
  end;
end;

procedure TIdIOHandlerChain.ReadBytes(var VBuffer: TIdBytes;
 AByteCount: Integer; AAppend: Boolean = True);
begin
  EIdException.IfFalse(AByteCount >= 0);
  if AByteCount > 0 then begin
    if FInputBuffer.Size < AByteCount then begin
      QueueAndWait(TIdWorkOpUnitReadSized.Create(AByteCount- FInputBuffer.Size));
    end;
    Assert(FInputBuffer.Size >= AByteCount);
    FInputBuffer.ExtractToBytes(VBuffer, AByteCount, AAppend);
  end;
end;

function TIdIOHandlerChain.ReadLn(ATerminator: string = LF;
  ATimeout: Integer = IdTimeoutDefault; AMaxLineLength: Integer = -1): string;
var
  LTermPos: Integer;
begin
  if AMaxLineLength = -1 then begin
    AMaxLineLength := MaxLineLength;
  end;
  // User may pass '' if they need to pass arguments beyond the first.
  if ATerminator = '' then begin
    ATerminator := LF;
  end;
  FReadLnSplit := False;
  FReadLnTimedOut := False;
  try
    LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
    if (LTermPos = 0) and ((AMaxLineLength = 0)
     or (FInputBuffer.Size < AMaxLineLength)) then begin
      QueueAndWait(TIdWorkOpUnitReadLn.Create(ATerminator, AMaxLineLength)
       , ATimeout);
      LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
    end;
    // LTermPos cannot be 0, and the code below can't handle it properly
    Assert(LTermPos > 0);
    if (AMaxLineLength <> 0) and (LTermPos > AMaxLineLength) then begin
      case FMaxLineAction of
        // TODO: find the right exception class here
        maException: EIdException.Toss('MaxLineLength exceded'); {do not localize}
        maSplit: Result := FInputBuffer.Extract(AMaxLineLength);
      end;
    end else begin
      Result := FInputBuffer.Extract(LTermPos - 1);
      if (ATerminator = LF) and (Copy(Result, Length(Result), 1) = CR) then begin
        Delete(Result, Length(Result), 1);
      end;
      FInputBuffer.Extract(Length(ATerminator));// remove the terminator
    end;
  except on E: EIdReadTimeout do
    FReadLnTimedOut := True;
  end;
end;

function TIdIOHandlerChain.AllData: string;
var
  LStream: TStringStream;
begin
  BeginWork(wmRead); try
    Result := '';
    LStream := TStringStream.Create(''); try
      QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(LStream), -1
       , True, False);
      Result := LStream.DataString;
    finally FreeAndNil(LStream); end;
  finally EndWork(wmRead); end;
end;

function TIdIOHandlerChain.WriteFile(
      const AFile: String;
      AEnableTransferFile: Boolean): Cardinal;
var
  LWO:TIdWorkOpUnitWriteFile;
begin
  //BGO: we ignore AEnableTransferFile for now
  Result := 0;
//  if not Assigned(Intercept) then begin
    LWO := TIdWorkOpUnitWriteFile.Create(AFile);
    try
      QueueAndWait(LWO,IdTimeoutDefault, false);
    finally
//      Result := LWO.BytesSent;
      FreeAndNil(LWO);
    end;
//  end else begin
//    inherited WriteFile(AFile, AEnableTransferFile);
//  end;
end;

procedure TIdIOHandlerChain.Write(
      AStream: TIdStreamVCL;
      ASize: Integer = 0;
      AWriteByteCount: Boolean = False
      );
var
  LStart: Integer;
  LThisSize: Integer;
begin
  if ASize < 0 then begin //"-1" All form current position
    LStart := AStream.VCLStream.Seek(0, soFromCurrent);
    ASize := AStream.VCLStream.Seek(0, soFromEnd) - LStart;
    AStream.VCLStream.Seek(LStart, soFromBeginning);
  end else if ASize = 0 then begin //"0" ALL
    LStart := 0;
    ASize := AStream.VCLStream.Seek(0, soFromEnd);
    AStream.VCLStream.Seek(0, soFromBeginning);
  end else begin //else ">0" ACount bytes
    LStart := AStream.VCLStream.Seek(0, soFromCurrent);
  end;

  if AWriteByteCount then begin
  	Write(ASize);
  end;

//  BeginWork(wmWrite, ASize);
  try
    while ASize > 0 do begin
      LThisSize := Min(128 * 1024, ASize); // 128K blocks
      QueueAndWait(TIdWorkOpUnitWriteStream.Create(AStream.VCLStream, LStart, LThisSize
       , False));
      Dec(ASize, LThisSize);
      Inc(LStart, LThisSize);
    end;
  finally
//    EndWork(wmWrite);
  end;
end;

procedure TIdIOHandlerChain.WriteDirect(
  ABuffer: TIdBytes
  );
begin
  QueueAndWait(TIdWorkOpUnitWriteBuffer.Create(@ABuffer[0], Length(ABuffer), False));
end;

procedure TIdIOHandlerChain.QueueAndWait(
  AWorkOpUnit: TIdWorkOpUnit;
  ATimeout: Integer = IdTimeoutDefault;
  AFreeWorkOpUnit: Boolean = True;
  AAllowGracefulException: Boolean = True
  );
var
  LWorkOpUnit: TIdWorkOpUnit;
begin
  try
    CheckForDisconnect(AAllowGracefulException);
    LWorkOpUnit := AWorkOpUnit;
    //
    if ATimeout = IdTimeoutInfinite then begin
      LWorkOpUnit.TimeOutAt := 0;
    end else begin
      if ATimeout = IdTimeoutDefault then begin
        if FReadTimeout <= 0 then begin
          LWorkOpUnit.TimeOutAt := 0;
        end else begin
          //we type cast FReadTimeOut as a cardinal to prevent the compiler from
          //expanding vars to an Int64 type.  That can incur a performance penalty.
          LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(FReadTimeout);
        end
      end else begin
        //FReadTimeOut is typecase as a cardinal to prevent the compiler from
        //expanding vars to an Int64 type which can incur a performance penalty.
        LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(ATimeout);
      end
    end;
    //
    LWorkOpUnit.Fiber := FFiber;
    LWorkOpUnit.IOHandler := Self;
    LWorkOpUnit.OnCompleted := WorkOpUnitCompleted;
    LWorkOpUnit.SocketHandle := Binding.Handle;
    // Add to queue and wait to be rescheduled when work is completed
    FChainEngine.AddWork(LWorkOpUnit);
    // Check to see if we need to reraise an exception
    LWorkOpUnit.RaiseException;
    // Check for timeout
    if LWorkOpUnit.TimedOut then begin
      EIdReadTimeout.Toss('Timed out'); {do not localize}
    end;
    // Check to see if it was closed during this operation
    CheckForDisconnect(AAllowGracefulException);
  finally
    if AFreeWorkOpUnit then begin
      AWorkOpUnit.Free;
    end;
  end;
end;

constructor TIdIOHandlerChain.Create(
  AOwner: TComponent;
  AChainEngine: TIdChainEngine;
  AFiberWeaver: TIdFiberWeaver;
  AFiber: TIdFiber
  );
begin
  inherited Create(AOwner);
  //
  EIdException.IfNotAssigned(AChainEngine, 'No chain engine specified.'); {do not localize}
  FChainEngine := AChainEngine;
  FChainEngine.SetIOHandlerOptions(Self);
  //
  EIdException.IfNotAssigned(AFiberWeaver, 'No fiber weaver specified.'); {do not localize}
  FFiberWeaver := AFiberWeaver;
  //
  EIdException.IfNotAssigned(AFiber, 'No fiber specified.'); {do not localize}
  FFiber := AFiber;
  // Initialize Overlapped structure
  New(FOverlapped);
  ZeroMemory(FOverlapped, SizeOf(TIdOverLapped));
  New(FOverlapped.Buffer);
end;

procedure TIdIOHandlerChain.WorkOpUnitCompleted(AWorkOpUnit: TIdWorkOpUnit);
begin
  FFiberWeaver.Add(AWorkOpUnit.Fiber);
end;

destructor TIdIOHandlerChain.Destroy;
begin
  // Tell the chain engine that we are closing and to remove any references to
  // us and cease any usage.
  // Do not do this in close, it can cause deadlocks because the engine can
  // call close while in its Execute.
  FChainEngine.RemoveSocket(Self);
  Dispose(FOverlapped.Buffer);
  Dispose(FOverlapped);
  inherited;
end;

{ TIdChainEngine }

procedure TIdChainEngine.BeforeDestruction;
begin
  if FThread <> nil then begin
    // Signal thread for termination
    FThread.Terminate;
    // Tell the engine we are attempting termination
    Terminating;
    // Wait for the thread to terminate
    FThread.WaitFor;
    // Free thread
    FreeAndNil(FThread);
  end;
  inherited;
end;

function TIdChainEngine.GetInputBuffer(const AIOHandler:TIdIOHandler):TidBuffer;
begin
  Result := TIdIOHandlerChain(AIOHandler).FInputBuffer;
end;

procedure TIdChainEngine.SetIOHandlerOptions(AIOHandler: TIdIOHandlerChain);
begin
  AIOHandler.ConnectMode := cmIOCP;
end;

procedure TIdChainEngine.SocketAccepted(AIOHandler: TIdIOHandlerChain);
begin
  // Associate the socket with the completion port.
  if CreateIoCompletionPort(AIOHandler.Binding.Handle, FCompletionPort, 0, 0)
   = 0 then begin
    RaiseLastOSError;
  end;
end;

procedure TIdChainEngine.Terminating;
begin
  if not PostQueuedCompletionStatus(FCompletionPort, 0, GCompletionKeyTerminate
   , nil) then begin
    RaiseLastOSError;
  end;
end;

procedure TIdChainEngine.Execute;
var
  LBytesTransferred: DWord;
  LCompletionKey: DWord;
  LOverlapped: PIdOverlapped;
begin
  // Wait forever on the completion port.  If we are terminating, a terminate
  // signal is sent into the queue.
  if GetQueuedCompletionStatus(FCompletionPort, LBytesTransferred
   , LCompletionKey, POverLapped(LOverlapped), INFINITE) then begin
    if LCompletionKey <> GCompletionKeyTerminate then begin
      // Socket has been closed
      if LBytesTransferred = 0 then begin
        LOverlapped.WorkOpUnit.IOHandler.CloseGracefully;
      end;
      LOverlapped.WorkOpUnit.Process(LOverlapped, LBytesTransferred);
    end;
  end;
end;

procedure TIdChainEngine.RemoveSocket(AIOHandler: TIdIOHandlerChain);
begin
//  raise EIdException.Create('Fall through error in ' + Self.ClassName+'.RemoveSocket');
end;

procedure TIdChainEngine.AddWork(AWorkOpUnit: TIdWorkOpUnit);
begin
  if AWorkOpUnit is TIdWorkOpUnitWaitConnected then begin
    // Associate the socket with the completion port.
    if CreateIOCompletionPort(AWorkOpUnit.SocketHandle, FCompletionPort, 0, 0)
     = 0 then begin
      RaiseLastOSError;
    end;
    AWorkOpUnit.Complete;
  end;
  AWorkOpUnit.Start;
end;

destructor TIdChainEngine.Destroy;
begin
  if CloseHandle(FCompletionPort) = False then begin
    RaiseLastOSError;
  end;
  inherited;
end;

procedure TIdChainEngine.InitComponent;
begin
{
var SysInfo: TSystemInfo;
  GetSystemInfo(SysInfo);
  SysInfo.dwNumberOfProcessors

Use GetSystemInfo instead. It will return the all info on the local
system's architecture and will also return a valid ActiveProcessorMask
which is a DWORD to be read as a bit array of the processor on the
system...

CZH> And next
CZH> question - any one know off hand how to set affinity? :)

Use the SetProcessAffinityMask or SetThreadAffinityMask API depending
on wether you want to act on the whole process or just a single
thread (SetThreadIdealProcessor is another way to do it: it just gives
the scheduler a hint about where to run a thread without forcing it:
good for keeping two threads doing IO one with each other on the same
processor).
}
  inherited;
  if not (csDesigning in ComponentState) then begin
    // Cant use .Name, its not initialized yet in Create
    FThread := TIdChainEngineThread.Create(Self, 'Chain Engine'); {do not localize}
  end;
  //MS says destruction is automatic, but Google seems to say that this initial
  //one is not auto managed as MS says, and that CloseHandle should be called.
  FCompletionPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
  if FCompletionPort = 0 then begin
    RaiseLastOSError;
  end;
end;

{ TIdChainEngineThread }

constructor TIdChainEngineThread.Create(
  AOwner: TIdChainEngine;
  const AName: string
  );
begin
  FChainEngine := AOwner;
  inherited Create(False, True, AName);
end;

(*procedure TIdChainEngineIOCP.TransmitFileIOCP(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);
var
  LPOverlapped: PIdOverlapped;
  LHFile:THandle;
begin
  New(LPOverlapped);
  ZeroMemory(LPOverlapped,sizeof(TIdOverLapped));
  New(LPOverlapped^.Buffer);
  LPOverlapped^.IOhandler:=TIdIOHandlerChain(AWorkOpUnit.IOhandler);
  LPOverlapped^.WorkOpUnit:=AWorkOpUnit;
  LHFile:=CreateFile(pchar(AFilename),GENERIC_READ,FILE_SHARE_READ,nil,OPEN_EXISTING,FILE_FLAG_SEQUENTIAL_SCAN,0);
  if LHFile=INVALID_HANDLE_VALUE then begin
    RaiseLastOSError;
  end;
  try
    if ServiceQueryTransmitFile(AWorkOpUnit.IOHandler.Binding.Handle,LHFile,0,0,POverlapped(LPOverlapped),nil,0) then begin
      AWorkOpUnit.Fiber.Relinquish;
    end else begin
      raise EIdException.Create('error in ServiceQueryTransmitFile');
    end;
  finally
    CloseHandle(LHFile);
  end;
end;
*)
(*procedure TIdChainEngineIOCP.TransmitFileAsStream(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);

  procedure CopyWorkUnit(ASrc,ADst: TIdWorkOpUnit);
  begin
    ADst.IOHandler   := ASrc.IOHandler;
    ADst.Fiber       := ASrc.Fiber;
    ADst.OnCompleted := ASrc.OnCompleted;
    ADst.SocketHandle:= ASrc.SocketHandle;
  end;

var
  LStream:TfileStream;
  LWorkOpUnit : TIdWorkOpUnitWriteStream;

  LBuf:pointer;
  LBufLen:integer;
begin
Assert(False, 'to do');
  LStream := TFileStream.Create(AFilename,fmOpenRead or fmShareDenyWrite);
  try
    LWorkOpUnit := TIdWorkOpUnitWriteStream.Create(LStream,0,LStream.size,false);
    try
      CopyWorkUnit(AWorkOpUnit,LWorkOpUnit);
      LBufLen:=Min(LStream.size,128*1024);
      getmem(LBuf,LBufLen);
      LWorkOpUnit.Stream.Position:=LWorkOpUnit.StartPos;
      LWorkOpUnit.Stream.Read(LBuf^,LBufLen);
      IssueWriteBuffer(LWorkOpUnit,LBuf,LBufLen);
    finally
      AWorkOpUnit.BytesSent := LStream.Size;
      LWorkOpUnit.free;
    end;
  finally
    LStream.free;
  end;
end;
*)

procedure TIdChainEngineThread.Run;
begin
  FChainEngine.Execute;
end;

end.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -