📄 dntcpreactor.pas
字号:
// The contents of this file are used with permission, subject to
// the Mozilla Public License Version 1.1 (the "License"); you may
// not use this file except in compliance with the License. You may
// obtain a copy of the License at
// http://www.mozilla.org/MPL/MPL-1.1.html
//
// Software distributed under the License is distributed on an
// "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
// implied. See the License for the specific language governing
// rights and limitations under the License.
{$I DnConfig.inc}
unit DnTcpReactor;
interface
uses
Windows, Winsock2, SysUtils, Classes, Contnrs,
DnRtl, DnConst, DnInterfaces, DnSimpleExecutor,
DnAbstractExecutor, DnAbstractLogger, DnAbstractTimer,
DnTimer, DnTimeOutSupport;
type
TDnTcpChannel = class;
TDnTcpReactor = class;
TDnReqContext = record
FOverlapped: TOverlapped;
FRequest: Pointer;
FReqRouting: Boolean;
end;
PDnReqContext = ^TDnReqContext;
TDnTcpRequest = class(TDnObject, IDnIORequest, IDnIOResponse)
protected
FChannel: IDnChannel;
FContext: TDnReqContext;
FKey: Pointer;
FStartBuffer: Pointer;
FTotalSize: Cardinal;
FErrorCode: Cardinal;
FRTContext: TDnThreadContext;
procedure PostError(ErrorCode: Integer; WasRead: Cardinal);
procedure CatchError;
//IDnIORequest
procedure Execute; virtual;
function IsComplete: Boolean; virtual;
procedure ReExecute; virtual; abstract;
function RequestType: TDnIORequestType; virtual; abstract;
//IDnIOResponse
function Channel: IDnIOTrackerHolder;
procedure CallHandler(Context: TDnThreadContext); virtual; abstract;
procedure SetTransferred(Transferred: Cardinal); virtual; abstract;
function IsCPUNeeded: Boolean; virtual; abstract;
procedure Cancel;
public
constructor Create(Channel: IDnChannel; Key: Pointer);
destructor Destroy; override;
end;
TDnTcpChannel = class (TDnObject, IDnIOTrackerHolder, IDnImplementation,
IUnknown, IDnChannel)
protected
FRemoteAddr: TSockAddrIn;
FSocket: TSocket;
FReactor: TDnTcpReactor;
FReadQueue: TInterfaceList;
FWriteQueue: TInterfaceList;
FClosingRequest: IDnIORequest;
FConnectingRequest: IDnIORequest;
FRunGuard: TDnMutex;
FTracker: Pointer;
FCache: String;
FCustomData: Pointer;
FFinishedIOCount: Cardinal;
FTimeOut: Cardinal;
FTimeOutObserver: IDnTimerObserver;
FTimeOutGuard: TDnMutex;
FClosing: Boolean;
FTimeOutExpired: Boolean;
FTimeOutAbort: Boolean;
FFirstTimeOutRequest: Boolean;
class function MatchingRequest(Context: Pointer): TDnTcpRequest;
procedure ExecuteNext(Request: IDnIORequest);
procedure SetTimeOut(Value: Cardinal);
procedure IssueTimeOutRequest(TimeOut: Cardinal; Request: IDnIORequest);
procedure InitChannel;
function GetRemoteAddrPtr: Pointer;
procedure IncrementIOCounter;
public
class function CheckImpl(obj: IUnknown): TDnTcpChannel;
constructor Create(Reactor: TDnTcpReactor; Sock: TSocket; RemoteAddr: TSockAddrIn);
constructor CreateEmpty(Reactor: TDnTcpReactor; const RemoteIP: String; Port: Word);
destructor Destroy; override;
procedure CloseSocketHandle;
procedure StopTimeOutTracking;
function CacheSize: Cardinal;
function CachePtr: PChar;
function Add2Cache(Block: PChar; BlockSize: Cardinal): Cardinal;
function ExtractFromCache(Block: PChar; BlockSize: Cardinal): Cardinal;
procedure InsertToCache(Block: PChar; BlockSize: Cardinal);
function CacheHasData: Boolean;
function IsClosed: Boolean;
//IDnChannel
function RemotePort: Word;
function RemoteAddr: String;
function RemoteHost: String;
procedure SetCustomData(P: Pointer);
function GetCustomData: Pointer;
function IsClosing: Boolean;
//IDnIOTrackerHolder
function IsBound: Boolean;
procedure Bind(Tracker: Pointer);
procedure Unbind(Tracker: Pointer);
function Tracker: Pointer;
procedure RunRequest(Request: IDnIORequest);
procedure SetNagle(Value: Boolean);
//IDnImplementation
function GetImplementation: Pointer;
property SocketHandle: TSocket read FSocket;
property Reactor: TDnTcpReactor read FReactor;
property TimeOut: Cardinal read FTimeOut write SetTimeOut;
property RemoteAddrPtr: Pointer read GetRemoteAddrPtr;
property TimeOutExpired: Boolean read FTimeOutExpired;
end;
TDnTcpReactorThread = class;
TDnTimeOutExpiredEvent = procedure (Context: TDnThreadContext; Channel: IDnChannel) of object;
{$IFDEF ROOTISCOMPONENT}
TDnTcpReactor = class (TComponent, IDnTimeOutHandler)
{$ELSE}
TDnTcpReactor = class (TDnObject, IDnTimeOutHandler)
{$ENDIF}
protected
FPort: THandle;
FActive: Boolean;
FThread: TDnTcpReactorThread;
FChannels: TInterfaceList;
FGuard: TDnMutex;
FLogger: TDnAbstractLogger;
FLogLevel: TDnLogLevel;
FExecutor: TDnAbstractExecutor;
FTimer: TDnAbstractTimer;
FTimerExecutor: TDnSimpleExecutor;
FTimeOutExpiredEvent: TDnTimeOutExpiredEvent;
procedure SetActive(Value: Boolean);
function TurnOn: Boolean; virtual;
function TurnOff: Boolean; virtual;
procedure TimeOutFired(Context: TDnThreadContext; Channel: IDnChannel;
ExpiredTacts: Cardinal; Key: Pointer);
procedure OnTimeOutExpired(Context: TDnThreadContext; Channel: IDnChannel);
function GetChannelCount: Integer;
{$IFDEF ROOTISCOMPONENT}
procedure Notification(AComponent: TComponent; Operation: TOperation); override;
{$ENDIF}
public
constructor Create {$IFDEF ROOTISCOMPONENT}(AOwner: TComponent);override{$ENDIF};
destructor Destroy; override;
procedure PostChannel(Channel: IDnChannel);
procedure PostChannelError(Channel: TDnTcpChannel; Request: TDnTcpRequest);
procedure RemoveChannel(Channel: IDnChannel);
function MakeChannel(const IPAddress: String; Port: Word): IDnChannel;
procedure SetTimeout(Channel: IDnChannel; Value: Cardinal);
procedure CloseChannels;
published
property Active: Boolean read FActive write SetActive;
property Executor: TDnAbstractExecutor read FExecutor write FExecutor;
property Logger: TDnAbstractLogger read FLogger write FLogger;
property LogLevel: TDnLogLevel read FLogLevel write FLogLevel;
property PortHandle: THandle read FPort;
property ChannelCount: Integer read GetChannelCount;
property OnTimeOut: TDnTimeOutExpiredEvent read FTimeOutExpiredEvent write FTimeOutExpiredEvent;
end;
TDnTcpReactorThread = class(TDnThread)
protected
FReactor: TDnTcpReactor;
procedure CreateContext; override;
procedure DestroyContext; override;
procedure ThreadRoutine; override;
procedure DoRequest(Request: TDnTcpRequest; Channel: TDnTcpChannel);
procedure DoClose(Channel: TDnTcpChannel; Request: TDnTcpRequest);
procedure UpdateCounters(Channel: TDnTcpChannel; Request: TDnTcpRequest);
procedure ParseIONotification(Transferred: Cardinal; Key: Cardinal; Overlapped: POverlapped);
procedure ParseIOError(Transferred: Cardinal; Key: Cardinal; Overlapped: POverlapped);
procedure LogMessage(S: String);
public
constructor Create(Reactor: TDnTcpReactor);
destructor Destroy; override;
end;
procedure Register;
implementation
var ParseIONotificationCount: Integer;
constructor TDnTcpRequest.Create(Channel: IDnChannel; Key: Pointer);
begin
inherited Create;
FChannel := Channel;
FTotalSize := 0;
FStartBuffer := Nil;
FRefCount := 0;
FKey := Key;
FErrorCode := 0;
FRTContext := Nil;
end;
destructor TDnTcpRequest.Destroy;
begin
inherited Destroy;
end;
procedure TDnTcpRequest.Cancel;
begin
Self.FContext.FRequest := Nil;
end;
procedure TDnTcpRequest.PostError(ErrorCode: Integer; WasRead: Cardinal);
var ChannelImpl: TDnTcpChannel;
begin
FErrorCode := ErrorCode;
ChannelImpl := TDnTcpChannel.CheckImpl(FChannel);
if LongBool(PostQueuedCompletionStatus( ChannelImpl.Reactor.PortHandle, WasRead,
Cardinal(Pointer(FChannel)), @FContext )) = False
then
raise EDnException.Create(ErrWin32Error, GetLastError(), 'PostQueuedCompletionStatus');
end;
procedure TDnTcpRequest.Execute;
var ChannelImpl: TDnTcpChannel;
begin
FRTContext := GetCurrentContext;
if FRTContext <> Nil then
FRTContext.Grab;
FErrorCode := 0;
FillChar(FContext.FOverlapped, SizeOf(FContext.FOverlapped), 0);
FContext.FRequest := Self;
FContext.FReqRouting := True;
ChannelImpl := TDnTcpChannel.CheckImpl(FChannel);
if not ChannelImpl.FFirstTimeOutRequest and (ChannelImpl.FTimeOut <> 0) then
begin
ChannelImpl.FFirstTimeOutRequest := True;
ChannelImpl.IssueTimeOutRequest(ChannelImpl.FTimeOut, Self);
end;
end;
function TDnTcpRequest.IsComplete: Boolean;
begin
if FRTContext <> Nil then
FRTContext.Release;
Result := False;
end;
procedure TDnTcpRequest.CatchError;
begin
FErrorCode := GetLastError;
end;
function TDnTcpRequest.Channel: IDnIOTrackerHolder;
begin
if FChannel<>Nil then
begin
Result := IDnIOTrackerHolder(FChannel);//as IDnIOTrackerHolder;
end;
end;
//------------------------------------------------------------------------------
class function TDnTcpChannel.CheckImpl(obj: IUnknown): TDnTcpChannel;
var impl: IDnImplementation;
begin
Result := Nil;
if IDnChannel(obj) <> Nil then
begin
impl := obj as IDnImplementation;
if impl <> Nil then
Result := TDnTcpChannel(impl.GetImplementation())
end;
end;
procedure TDnTcpChannel.InitChannel;
begin
FTracker := Nil;
FRefCount := 0;
//FRequest := TInterfaceList.Create;
FReadQueue := TInterfaceList.Create;
FWriteQueue := TInterfaceList.Create;
FClosingRequest := Nil;
FConnectingRequest := Nil;
FRunGuard := TDnMutex.Create;
FCustomData := Nil;
SetLength(FCache, 0);
FTimeOut := 0;
FTimeOutGuard := TDnMutex.Create;
FClosing := False;
FFinishedIOCount := 0;
FTimeOutExpired := False;
FTimeOutAbort := False;
FFirstTimeOutRequest := False;
end;
constructor TDnTcpChannel.Create(Reactor: TDnTcpReactor; Sock: TSocket; RemoteAddr: TSockAddrIn);
begin
inherited Create;
FReactor := Reactor;
FSocket := Sock;
FRemoteAddr := RemoteAddr;
InitChannel;
end;
//{$O-}
constructor TDnTcpChannel.CreateEmpty(Reactor: TDnTcpReactor; const RemoteIP: String; Port: Word);
begin
inherited Create;
FReactor := Reactor;
FSocket := WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, Nil, 0, WSA_FLAG_OVERLAPPED);
if FSocket = INVALID_SOCKET then
raise EDnException.Create(ErrWin32Error, WSAGetLastError(), 'WSASocket');
FillChar(FRemoteAddr, 0, SizeOf(FRemoteAddr));
FRemoteAddr.sin_addr.S_addr := inet_addr(PChar(RemoteIP));
FRemoteAddr.sin_port := htons(Port);
FRemoteAddr.sin_family := AF_INET;
InitChannel;
end;
function TDnTcpChannel.GetImplementation: Pointer;
begin
Result := Pointer(TDnTcpChannel(Self));
end;
//{$O+}
function TDnTcpChannel.GetRemoteAddrPtr: Pointer;
begin
Result := @FRemoteAddr;
end;
procedure TDnTcpChannel.IncrementIOCounter;
begin
FTimeOutGuard.Acquire;
Inc(FFinishedIOCount);
FTimeOutGuard.Release;
end;
procedure TDnTcpChannel.IssueTimeOutRequest(TimeOut: Cardinal; Request: IDnIORequest);
begin
FTimeOutGuard.Acquire;
FTimeOutObserver := Nil;
FTimeOutObserver := FReactor.FTimer.RequestTimerNotify(Self, TimeOut, Pointer(FFinishedIOCount));
FTimeOutGuard.Release;
end;
procedure TDnTcpChannel.SetCustomData(P: Pointer);
begin
FCustomData := P;
end;
function TDnTcpChannel.GetCustomData: Pointer;
begin
Result := FCustomData;
end;
procedure TDnTcpChannel.SetTimeOut(Value: Cardinal);
begin
if FTimeOut = 0 then
FTimeOut := Value
else
raise EDnException.Create(ErrCannotSetTimeOutTwice, 0);
end;
function TDnTcpChannel.CacheSize: Cardinal;
begin
Result := Length(FCache);
end;
function TDnTcpChannel.CachePtr: PChar;
begin
Result := PChar(FCache);
end;
function TDnTcpChannel.Add2Cache(Block: PChar; BlockSize: Cardinal): Cardinal;
var CacheLen: Cardinal;
begin
if BlockSize <> 0 then
begin
CacheLen := Length(FCache);
SetLength(FCache, CacheLen + BlockSize);
Move(Block^, FCache[CacheLen+1], BlockSize);
Result := Length(FCache);
end else
Result := 0;
end;
procedure TDnTcpChannel.InsertToCache(Block: PChar; BlockSize: Cardinal);
var CacheLen : Cardinal;
begin
if (Block<>Nil) and (BlockSize <> 0) then
begin
CacheLen := Length(FCache);
SetLength(FCache, CacheLen + BlockSize);
if CacheLen <> 0 then
Move(FCache[1], FCache[1+BlockSize], CacheLen);
Move(Block^, FCache[1], BlockSize);
end;
end;
function TDnTcpChannel.ExtractFromCache(Block: PChar; BlockSize: Cardinal): Cardinal;
var CacheLen: Cardinal;
begin
CacheLen := Length(FCache);
if CacheLen < BlockSize then
begin
Move(FCache[1], Block^, CacheLen);
SetLength(FCache, 0);
Result := CacheLen;
end else
begin
Move(FCache[1], Block^, BlockSize);
Delete(FCache, 1, BlockSize);
Result := BlockSize;
end;
end;
function TDnTcpChannel.CacheHasData: Boolean;
begin
Result := Length(FCache) <> 0;
end;
//Stop/cancel the timeout tracking activity. It should be called only 1 time.
//There isn't a back way - you can't start timeout tracking again.
procedure TDnTcpChannel.StopTimeOutTracking;
begin
if FTimeOutObserver <> Nil then
begin
FTimeOutAbort := True;
FTimeOutObserver.Cancel;
FTimeOutObserver := Nil;
end;
end;
function TDnTcpChannel.IsClosing: Boolean;
begin
Result := FClosingRequest <> Nil;
end;
procedure TDnTcpChannel.RunRequest(Request: IDnIORequest);
begin
if not FClosing then
begin
FRunGuard.Acquire;
try
case Request.RequestType of
rtClose: begin
FClosingRequest := Request;
if (FReadQueue.Count = 0) and (FWriteQueue.Count = 0) and
(FConnectingRequest = Nil) then
Request.Execute;
end;
rtBrutalClose:
begin
if FClosingRequest = Nil then
begin
FClosingRequest := Request;
Request.Execute;
end;
end;
rtConnect: begin
FConnectingRequest := Request;
Request.Execute;
end;
rtRead: if (FReadQueue.Add(Request) = 0) and (FConnectingRequest = Nil) then
Request.Execute;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -