📄 idiohandlerchain.pas
字号:
begin
Result := 0;
EIdException.Toss('Fall through error in ' + ClassName); {do not localize}
end;
procedure TIdIOHandlerChain.ReadStream(AStream: TIdStreamVCL; AByteCount: Integer;
AReadUntilDisconnect: Boolean);
begin
if AReadUntilDisconnect then begin
QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(AStream.VCLStream), -1
, True, False);
end else begin
QueueAndWait(TIdWorkOpUnitReadSizedStream.Create(AStream.VCLStream, AByteCount));
end;
end;
procedure TIdIOHandlerChain.ReadBytes(var VBuffer: TIdBytes;
AByteCount: Integer; AAppend: Boolean = True);
begin
EIdException.IfFalse(AByteCount >= 0);
if AByteCount > 0 then begin
if FInputBuffer.Size < AByteCount then begin
QueueAndWait(TIdWorkOpUnitReadSized.Create(AByteCount- FInputBuffer.Size));
end;
Assert(FInputBuffer.Size >= AByteCount);
FInputBuffer.ExtractToBytes(VBuffer, AByteCount, AAppend);
end;
end;
function TIdIOHandlerChain.ReadLn(ATerminator: string = LF;
ATimeout: Integer = IdTimeoutDefault; AMaxLineLength: Integer = -1): string;
var
LTermPos: Integer;
begin
if AMaxLineLength = -1 then begin
AMaxLineLength := MaxLineLength;
end;
// User may pass '' if they need to pass arguments beyond the first.
if ATerminator = '' then begin
ATerminator := LF;
end;
FReadLnSplit := False;
FReadLnTimedOut := False;
try
LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
if (LTermPos = 0) and ((AMaxLineLength = 0)
or (FInputBuffer.Size < AMaxLineLength)) then begin
QueueAndWait(TIdWorkOpUnitReadLn.Create(ATerminator, AMaxLineLength)
, ATimeout);
LTermPos := FInputBuffer.IndexOf(ATerminator) + 1;
end;
// LTermPos cannot be 0, and the code below can't handle it properly
Assert(LTermPos > 0);
if (AMaxLineLength <> 0) and (LTermPos > AMaxLineLength) then begin
case FMaxLineAction of
// TODO: find the right exception class here
maException: EIdException.Toss('MaxLineLength exceded'); {do not localize}
maSplit: Result := FInputBuffer.Extract(AMaxLineLength);
end;
end else begin
Result := FInputBuffer.Extract(LTermPos - 1);
if (ATerminator = LF) and (Copy(Result, Length(Result), 1) = CR) then begin
Delete(Result, Length(Result), 1);
end;
FInputBuffer.Extract(Length(ATerminator));// remove the terminator
end;
except on E: EIdReadTimeout do
FReadLnTimedOut := True;
end;
end;
function TIdIOHandlerChain.AllData: string;
var
LStream: TStringStream;
begin
BeginWork(wmRead); try
Result := '';
LStream := TStringStream.Create(''); try
QueueAndWait(TIdWorkOpUnitReadUntilDisconnect.Create(LStream), -1
, True, False);
Result := LStream.DataString;
finally FreeAndNil(LStream); end;
finally EndWork(wmRead); end;
end;
function TIdIOHandlerChain.WriteFile(
const AFile: String;
AEnableTransferFile: Boolean): Cardinal;
var
LWO:TIdWorkOpUnitWriteFile;
begin
//BGO: we ignore AEnableTransferFile for now
Result := 0;
// if not Assigned(Intercept) then begin
LWO := TIdWorkOpUnitWriteFile.Create(AFile);
try
QueueAndWait(LWO,IdTimeoutDefault, false);
finally
// Result := LWO.BytesSent;
FreeAndNil(LWO);
end;
// end else begin
// inherited WriteFile(AFile, AEnableTransferFile);
// end;
end;
procedure TIdIOHandlerChain.Write(
AStream: TIdStreamVCL;
ASize: Integer = 0;
AWriteByteCount: Boolean = False
);
var
LStart: Integer;
LThisSize: Integer;
begin
if ASize < 0 then begin //"-1" All form current position
LStart := AStream.VCLStream.Seek(0, soFromCurrent);
ASize := AStream.VCLStream.Seek(0, soFromEnd) - LStart;
AStream.VCLStream.Seek(LStart, soFromBeginning);
end else if ASize = 0 then begin //"0" ALL
LStart := 0;
ASize := AStream.VCLStream.Seek(0, soFromEnd);
AStream.VCLStream.Seek(0, soFromBeginning);
end else begin //else ">0" ACount bytes
LStart := AStream.VCLStream.Seek(0, soFromCurrent);
end;
if AWriteByteCount then begin
Write(ASize);
end;
// BeginWork(wmWrite, ASize);
try
while ASize > 0 do begin
LThisSize := Min(128 * 1024, ASize); // 128K blocks
QueueAndWait(TIdWorkOpUnitWriteStream.Create(AStream.VCLStream, LStart, LThisSize
, False));
Dec(ASize, LThisSize);
Inc(LStart, LThisSize);
end;
finally
// EndWork(wmWrite);
end;
end;
procedure TIdIOHandlerChain.WriteDirect(
ABuffer: TIdBytes
);
begin
QueueAndWait(TIdWorkOpUnitWriteBuffer.Create(@ABuffer[0], Length(ABuffer), False));
end;
procedure TIdIOHandlerChain.QueueAndWait(
AWorkOpUnit: TIdWorkOpUnit;
ATimeout: Integer = IdTimeoutDefault;
AFreeWorkOpUnit: Boolean = True;
AAllowGracefulException: Boolean = True
);
var
LWorkOpUnit: TIdWorkOpUnit;
begin
try
CheckForDisconnect(AAllowGracefulException);
LWorkOpUnit := AWorkOpUnit;
//
if ATimeout = IdTimeoutInfinite then begin
LWorkOpUnit.TimeOutAt := 0;
end else begin
if ATimeout = IdTimeoutDefault then begin
if FReadTimeout <= 0 then begin
LWorkOpUnit.TimeOutAt := 0;
end else begin
//we type cast FReadTimeOut as a cardinal to prevent the compiler from
//expanding vars to an Int64 type. That can incur a performance penalty.
LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(FReadTimeout);
end
end else begin
//FReadTimeOut is typecase as a cardinal to prevent the compiler from
//expanding vars to an Int64 type which can incur a performance penalty.
LWorkOpUnit.TimeOutAt := GetTickCount + Cardinal(ATimeout);
end
end;
//
LWorkOpUnit.Fiber := FFiber;
LWorkOpUnit.IOHandler := Self;
LWorkOpUnit.OnCompleted := WorkOpUnitCompleted;
LWorkOpUnit.SocketHandle := Binding.Handle;
// Add to queue and wait to be rescheduled when work is completed
FChainEngine.AddWork(LWorkOpUnit);
// Check to see if we need to reraise an exception
LWorkOpUnit.RaiseException;
// Check for timeout
if LWorkOpUnit.TimedOut then begin
EIdReadTimeout.Toss('Timed out'); {do not localize}
end;
// Check to see if it was closed during this operation
CheckForDisconnect(AAllowGracefulException);
finally
if AFreeWorkOpUnit then begin
AWorkOpUnit.Free;
end;
end;
end;
constructor TIdIOHandlerChain.Create(
AOwner: TComponent;
AChainEngine: TIdChainEngine;
AFiberWeaver: TIdFiberWeaver;
AFiber: TIdFiber
);
begin
inherited Create(AOwner);
//
EIdException.IfNotAssigned(AChainEngine, 'No chain engine specified.'); {do not localize}
FChainEngine := AChainEngine;
FChainEngine.SetIOHandlerOptions(Self);
//
EIdException.IfNotAssigned(AFiberWeaver, 'No fiber weaver specified.'); {do not localize}
FFiberWeaver := AFiberWeaver;
//
EIdException.IfNotAssigned(AFiber, 'No fiber specified.'); {do not localize}
FFiber := AFiber;
// Initialize Overlapped structure
New(FOverlapped);
ZeroMemory(FOverlapped, SizeOf(TIdOverLapped));
New(FOverlapped.Buffer);
end;
procedure TIdIOHandlerChain.WorkOpUnitCompleted(AWorkOpUnit: TIdWorkOpUnit);
begin
FFiberWeaver.Add(AWorkOpUnit.Fiber);
end;
destructor TIdIOHandlerChain.Destroy;
begin
// Tell the chain engine that we are closing and to remove any references to
// us and cease any usage.
// Do not do this in close, it can cause deadlocks because the engine can
// call close while in its Execute.
FChainEngine.RemoveSocket(Self);
Dispose(FOverlapped.Buffer);
Dispose(FOverlapped);
inherited;
end;
{ TIdChainEngine }
procedure TIdChainEngine.BeforeDestruction;
begin
if FThread <> nil then begin
// Signal thread for termination
FThread.Terminate;
// Tell the engine we are attempting termination
Terminating;
// Wait for the thread to terminate
FThread.WaitFor;
// Free thread
FreeAndNil(FThread);
end;
inherited;
end;
function TIdChainEngine.GetInputBuffer(const AIOHandler:TIdIOHandler):TidBuffer;
begin
Result := TIdIOHandlerChain(AIOHandler).FInputBuffer;
end;
procedure TIdChainEngine.SetIOHandlerOptions(AIOHandler: TIdIOHandlerChain);
begin
AIOHandler.ConnectMode := cmIOCP;
end;
procedure TIdChainEngine.SocketAccepted(AIOHandler: TIdIOHandlerChain);
begin
// Associate the socket with the completion port.
if CreateIoCompletionPort(AIOHandler.Binding.Handle, FCompletionPort, 0, 0)
= 0 then begin
RaiseLastOSError;
end;
end;
procedure TIdChainEngine.Terminating;
begin
if not PostQueuedCompletionStatus(FCompletionPort, 0, GCompletionKeyTerminate
, nil) then begin
RaiseLastOSError;
end;
end;
procedure TIdChainEngine.Execute;
var
LBytesTransferred: DWord;
LCompletionKey: DWord;
LOverlapped: PIdOverlapped;
begin
// Wait forever on the completion port. If we are terminating, a terminate
// signal is sent into the queue.
if GetQueuedCompletionStatus(FCompletionPort, LBytesTransferred
, LCompletionKey, POverLapped(LOverlapped), INFINITE) then begin
if LCompletionKey <> GCompletionKeyTerminate then begin
// Socket has been closed
if LBytesTransferred = 0 then begin
LOverlapped.WorkOpUnit.IOHandler.CloseGracefully;
end;
LOverlapped.WorkOpUnit.Process(LOverlapped, LBytesTransferred);
end;
end;
end;
procedure TIdChainEngine.RemoveSocket(AIOHandler: TIdIOHandlerChain);
begin
// raise EIdException.Create('Fall through error in ' + Self.ClassName+'.RemoveSocket');
end;
procedure TIdChainEngine.AddWork(AWorkOpUnit: TIdWorkOpUnit);
begin
if AWorkOpUnit is TIdWorkOpUnitWaitConnected then begin
// Associate the socket with the completion port.
if CreateIOCompletionPort(AWorkOpUnit.SocketHandle, FCompletionPort, 0, 0)
= 0 then begin
RaiseLastOSError;
end;
AWorkOpUnit.Complete;
end;
AWorkOpUnit.Start;
end;
destructor TIdChainEngine.Destroy;
begin
if CloseHandle(FCompletionPort) = False then begin
RaiseLastOSError;
end;
inherited;
end;
procedure TIdChainEngine.InitComponent;
begin
{
var SysInfo: TSystemInfo;
GetSystemInfo(SysInfo);
SysInfo.dwNumberOfProcessors
Use GetSystemInfo instead. It will return the all info on the local
system's architecture and will also return a valid ActiveProcessorMask
which is a DWORD to be read as a bit array of the processor on the
system...
CZH> And next
CZH> question - any one know off hand how to set affinity? :)
Use the SetProcessAffinityMask or SetThreadAffinityMask API depending
on wether you want to act on the whole process or just a single
thread (SetThreadIdealProcessor is another way to do it: it just gives
the scheduler a hint about where to run a thread without forcing it:
good for keeping two threads doing IO one with each other on the same
processor).
}
inherited;
if not (csDesigning in ComponentState) then begin
// Cant use .Name, its not initialized yet in Create
FThread := TIdChainEngineThread.Create(Self, 'Chain Engine'); {do not localize}
end;
//MS says destruction is automatic, but Google seems to say that this initial
//one is not auto managed as MS says, and that CloseHandle should be called.
FCompletionPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
if FCompletionPort = 0 then begin
RaiseLastOSError;
end;
end;
{ TIdChainEngineThread }
constructor TIdChainEngineThread.Create(
AOwner: TIdChainEngine;
const AName: string
);
begin
FChainEngine := AOwner;
inherited Create(False, True, AName);
end;
(*procedure TIdChainEngineIOCP.TransmitFileIOCP(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);
var
LPOverlapped: PIdOverlapped;
LHFile:THandle;
begin
New(LPOverlapped);
ZeroMemory(LPOverlapped,sizeof(TIdOverLapped));
New(LPOverlapped^.Buffer);
LPOverlapped^.IOhandler:=TIdIOHandlerChain(AWorkOpUnit.IOhandler);
LPOverlapped^.WorkOpUnit:=AWorkOpUnit;
LHFile:=CreateFile(pchar(AFilename),GENERIC_READ,FILE_SHARE_READ,nil,OPEN_EXISTING,FILE_FLAG_SEQUENTIAL_SCAN,0);
if LHFile=INVALID_HANDLE_VALUE then begin
RaiseLastOSError;
end;
try
if ServiceQueryTransmitFile(AWorkOpUnit.IOHandler.Binding.Handle,LHFile,0,0,POverlapped(LPOverlapped),nil,0) then begin
AWorkOpUnit.Fiber.Relinquish;
end else begin
raise EIdException.Create('error in ServiceQueryTransmitFile');
end;
finally
CloseHandle(LHFile);
end;
end;
*)
(*procedure TIdChainEngineIOCP.TransmitFileAsStream(const AWorkOpUnit:TIdWorkOpUnitWriteFile;const AFilename:string);
procedure CopyWorkUnit(ASrc,ADst: TIdWorkOpUnit);
begin
ADst.IOHandler := ASrc.IOHandler;
ADst.Fiber := ASrc.Fiber;
ADst.OnCompleted := ASrc.OnCompleted;
ADst.SocketHandle:= ASrc.SocketHandle;
end;
var
LStream:TfileStream;
LWorkOpUnit : TIdWorkOpUnitWriteStream;
LBuf:pointer;
LBufLen:integer;
begin
Assert(False, 'to do');
LStream := TFileStream.Create(AFilename,fmOpenRead or fmShareDenyWrite);
try
LWorkOpUnit := TIdWorkOpUnitWriteStream.Create(LStream,0,LStream.size,false);
try
CopyWorkUnit(AWorkOpUnit,LWorkOpUnit);
LBufLen:=Min(LStream.size,128*1024);
getmem(LBuf,LBufLen);
LWorkOpUnit.Stream.Position:=LWorkOpUnit.StartPos;
LWorkOpUnit.Stream.Read(LBuf^,LBufLen);
IssueWriteBuffer(LWorkOpUnit,LBuf,LBufLen);
finally
AWorkOpUnit.BytesSent := LStream.Size;
LWorkOpUnit.free;
end;
finally
LStream.free;
end;
end;
*)
procedure TIdChainEngineThread.Run;
begin
FChainEngine.Execute;
end;
end.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -