⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dntcpreactor.pas

📁 一个国外比较早的IOCP控件
💻 PAS
📖 第 1 页 / 共 2 页
字号:
// The contents of this file are used with permission, subject to
// the Mozilla Public License Version 1.1 (the "License"); you may
// not use this file except in compliance with the License. You may
// obtain a copy of the License at
// http://www.mozilla.org/MPL/MPL-1.1.html
//
// Software distributed under the License is distributed on an
// "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
// implied. See the License for the specific language governing
// rights and limitations under the License.
{$I DnConfig.inc}
unit DnTcpReactor;

interface
uses
  Windows, Winsock2, SysUtils, Classes, Contnrs,
  DnRtl, DnConst, DnInterfaces, DnSimpleExecutor,
  DnAbstractExecutor, DnAbstractLogger, DnAbstractTimer,
  DnTimer, DnTimeOutSupport;

type
  TDnTcpChannel = class;
  TDnTcpReactor = class;

  TDnReqContext = record
    FOverlapped:  TOverlapped;
    FRequest:     Pointer;
    FReqRouting:  Boolean;
  end;

  PDnReqContext = ^TDnReqContext;

  TDnTcpRequest = class(TDnObject, IDnIORequest, IDnIOResponse)
  protected
    FChannel:     IDnChannel;
    FContext:     TDnReqContext;
    FKey:         Pointer;
    FStartBuffer: Pointer;
    FTotalSize:   Cardinal;
    FErrorCode:   Cardinal;
    FRTContext:   TDnThreadContext;

    procedure PostError(ErrorCode: Integer; WasRead: Cardinal);
    procedure CatchError;

    //IDnIORequest
    procedure Execute; virtual;
    function  IsComplete: Boolean; virtual;
    procedure ReExecute; virtual; abstract;
    function  RequestType: TDnIORequestType; virtual; abstract;
    //IDnIOResponse
    function  Channel: IDnIOTrackerHolder;
    procedure CallHandler(Context: TDnThreadContext); virtual; abstract;

    procedure SetTransferred(Transferred: Cardinal); virtual; abstract;
    function  IsCPUNeeded: Boolean; virtual; abstract;
    procedure Cancel;
  public
    constructor Create(Channel: IDnChannel; Key: Pointer);
    destructor Destroy; override;
  end;


  TDnTcpChannel = class (TDnObject, IDnIOTrackerHolder, IDnImplementation,
                          IUnknown, IDnChannel)
  protected
    FRemoteAddr:          TSockAddrIn;
    FSocket:              TSocket;
    FReactor:             TDnTcpReactor;

    FReadQueue:           TInterfaceList;
    FWriteQueue:          TInterfaceList;
    FClosingRequest:      IDnIORequest;
    FConnectingRequest:   IDnIORequest;
    FRunGuard:            TDnMutex;

    FTracker:             Pointer;
    FCache:               String;
    FCustomData:          Pointer;
    FFinishedIOCount:     Cardinal;
    FTimeOut:             Cardinal;
    FTimeOutObserver:     IDnTimerObserver;
    FTimeOutGuard:        TDnMutex;
    FClosing:             Boolean;
    FTimeOutExpired:      Boolean;
    FTimeOutAbort:        Boolean;
    FFirstTimeOutRequest: Boolean;
    
    class function    MatchingRequest(Context: Pointer): TDnTcpRequest;
    procedure   ExecuteNext(Request: IDnIORequest);
    procedure   SetTimeOut(Value: Cardinal);
    procedure   IssueTimeOutRequest(TimeOut: Cardinal; Request: IDnIORequest);
    procedure   InitChannel;
    function    GetRemoteAddrPtr: Pointer;
    procedure   IncrementIOCounter;
  public
    class function CheckImpl(obj: IUnknown): TDnTcpChannel;
    constructor Create(Reactor: TDnTcpReactor; Sock: TSocket; RemoteAddr: TSockAddrIn);
    constructor CreateEmpty(Reactor: TDnTcpReactor; const RemoteIP: String; Port: Word);
    destructor  Destroy; override;

    procedure   CloseSocketHandle;
    procedure   StopTimeOutTracking;

    function    CacheSize: Cardinal;
    function    CachePtr: PChar;
    function    Add2Cache(Block: PChar; BlockSize: Cardinal): Cardinal;
    function    ExtractFromCache(Block: PChar; BlockSize: Cardinal): Cardinal;
    procedure   InsertToCache(Block: PChar; BlockSize: Cardinal);
    function    CacheHasData: Boolean;
    function    IsClosed: Boolean;

    //IDnChannel
    function  RemotePort: Word;
    function  RemoteAddr: String;
    function  RemoteHost: String;
    procedure SetCustomData(P: Pointer);
    function  GetCustomData: Pointer;
    function  IsClosing: Boolean;
    //IDnIOTrackerHolder
    function  IsBound: Boolean;
    procedure Bind(Tracker: Pointer);
    procedure Unbind(Tracker: Pointer);
    function  Tracker: Pointer;

    procedure RunRequest(Request: IDnIORequest);
    procedure SetNagle(Value: Boolean);

    //IDnImplementation
    function GetImplementation: Pointer;

    property SocketHandle: TSocket read FSocket;
    property Reactor: TDnTcpReactor read FReactor;
    property TimeOut: Cardinal read FTimeOut write SetTimeOut;
    property RemoteAddrPtr: Pointer read GetRemoteAddrPtr;
    property TimeOutExpired: Boolean read FTimeOutExpired;
  end;

  TDnTcpReactorThread = class;
  TDnTimeOutExpiredEvent = procedure (Context: TDnThreadContext; Channel: IDnChannel) of object;
  
  {$IFDEF ROOTISCOMPONENT}
  TDnTcpReactor = class (TComponent, IDnTimeOutHandler)
  {$ELSE}
  TDnTcpReactor = class (TDnObject, IDnTimeOutHandler)
  {$ENDIF}
  protected
    FPort:                  THandle;

    FActive:                Boolean;
    FThread:                TDnTcpReactorThread;
    FChannels:              TInterfaceList;
    FGuard:                 TDnMutex;

    FLogger:                TDnAbstractLogger;
    FLogLevel:              TDnLogLevel;
    FExecutor:              TDnAbstractExecutor;

    FTimer:                 TDnAbstractTimer;
    FTimerExecutor:         TDnSimpleExecutor;
    FTimeOutExpiredEvent:   TDnTimeOutExpiredEvent;

    procedure SetActive(Value: Boolean);
    function  TurnOn: Boolean; virtual;
    function  TurnOff: Boolean; virtual;
    procedure TimeOutFired(Context: TDnThreadContext; Channel: IDnChannel;
                              ExpiredTacts: Cardinal; Key: Pointer);
    procedure OnTimeOutExpired(Context: TDnThreadContext; Channel: IDnChannel);
    function  GetChannelCount: Integer;
    {$IFDEF ROOTISCOMPONENT}
    procedure Notification(AComponent: TComponent; Operation: TOperation); override;
    {$ENDIF}
  public
    constructor Create {$IFDEF ROOTISCOMPONENT}(AOwner: TComponent);override{$ENDIF}; 
    destructor  Destroy; override;
    procedure   PostChannel(Channel: IDnChannel);
    procedure   PostChannelError(Channel: TDnTcpChannel; Request: TDnTcpRequest);
    procedure   RemoveChannel(Channel: IDnChannel);
    function    MakeChannel(const IPAddress: String; Port: Word): IDnChannel;
    procedure   SetTimeout(Channel: IDnChannel; Value: Cardinal);
    procedure   CloseChannels;
  published
    property Active: Boolean read FActive write SetActive;
    property Executor: TDnAbstractExecutor read FExecutor write FExecutor;
    property Logger: TDnAbstractLogger read FLogger write FLogger;
    property LogLevel: TDnLogLevel read FLogLevel write FLogLevel;
    property PortHandle: THandle read FPort;
    property ChannelCount: Integer read GetChannelCount;
    property OnTimeOut: TDnTimeOutExpiredEvent read FTimeOutExpiredEvent write FTimeOutExpiredEvent;
  end;

  TDnTcpReactorThread = class(TDnThread)
  protected
    FReactor: TDnTcpReactor;
    procedure CreateContext; override;
    procedure DestroyContext; override;
    procedure ThreadRoutine; override;
    procedure DoRequest(Request: TDnTcpRequest; Channel: TDnTcpChannel);
    procedure DoClose(Channel: TDnTcpChannel; Request: TDnTcpRequest);
    procedure UpdateCounters(Channel: TDnTcpChannel; Request: TDnTcpRequest);
    procedure ParseIONotification(Transferred: Cardinal; Key: Cardinal; Overlapped: POverlapped);
    procedure ParseIOError(Transferred: Cardinal; Key: Cardinal; Overlapped: POverlapped);
    procedure LogMessage(S: String);
  public
    constructor Create(Reactor: TDnTcpReactor);
    destructor Destroy; override;
  end;


  procedure Register;

implementation
var ParseIONotificationCount: Integer;

constructor TDnTcpRequest.Create(Channel: IDnChannel; Key: Pointer);
begin
  inherited Create;
  FChannel := Channel;
  FTotalSize := 0;
  FStartBuffer := Nil;
  FRefCount := 0;
  FKey := Key;
  FErrorCode := 0;
  FRTContext := Nil;
end;

destructor TDnTcpRequest.Destroy;
begin
  inherited Destroy;
end;

procedure TDnTcpRequest.Cancel;
begin
  Self.FContext.FRequest := Nil;
end;

procedure TDnTcpRequest.PostError(ErrorCode: Integer; WasRead: Cardinal);
var ChannelImpl: TDnTcpChannel;
begin
  FErrorCode := ErrorCode;
  ChannelImpl := TDnTcpChannel.CheckImpl(FChannel);
  if LongBool(PostQueuedCompletionStatus( ChannelImpl.Reactor.PortHandle, WasRead,
                                Cardinal(Pointer(FChannel)), @FContext )) = False
  then
    raise EDnException.Create(ErrWin32Error, GetLastError(), 'PostQueuedCompletionStatus');
end;

procedure TDnTcpRequest.Execute;
var ChannelImpl: TDnTcpChannel;
begin
  FRTContext := GetCurrentContext;
  if FRTContext <> Nil then
    FRTContext.Grab;

  FErrorCode := 0;
  FillChar(FContext.FOverlapped, SizeOf(FContext.FOverlapped), 0);
  FContext.FRequest := Self;
  FContext.FReqRouting := True;

  ChannelImpl := TDnTcpChannel.CheckImpl(FChannel);
  if not ChannelImpl.FFirstTimeOutRequest and (ChannelImpl.FTimeOut <> 0) then
  begin
    ChannelImpl.FFirstTimeOutRequest := True;
    ChannelImpl.IssueTimeOutRequest(ChannelImpl.FTimeOut, Self);
  end;
end;

function  TDnTcpRequest.IsComplete: Boolean;
begin
  if FRTContext <> Nil then
    FRTContext.Release;
  Result := False;
end;


procedure TDnTcpRequest.CatchError;
begin
  FErrorCode := GetLastError;
end;

function  TDnTcpRequest.Channel: IDnIOTrackerHolder;
begin
  if FChannel<>Nil then
  begin
    Result := IDnIOTrackerHolder(FChannel);//as IDnIOTrackerHolder;
  end;
end;
//------------------------------------------------------------------------------

class function TDnTcpChannel.CheckImpl(obj: IUnknown): TDnTcpChannel;
var impl: IDnImplementation;
begin
  Result := Nil;
  if IDnChannel(obj) <> Nil then
  begin
    impl := obj as IDnImplementation;
    if impl <> Nil then
      Result := TDnTcpChannel(impl.GetImplementation())
  end;
end;


procedure TDnTcpChannel.InitChannel;
begin
  FTracker := Nil;
  FRefCount := 0;
  //FRequest := TInterfaceList.Create;
  FReadQueue := TInterfaceList.Create;
  FWriteQueue := TInterfaceList.Create;
  FClosingRequest := Nil;
  FConnectingRequest := Nil;
  FRunGuard := TDnMutex.Create;
  FCustomData := Nil;
  SetLength(FCache, 0);
  FTimeOut := 0;
  FTimeOutGuard := TDnMutex.Create;
  FClosing := False;
  FFinishedIOCount := 0;
  FTimeOutExpired := False;
  FTimeOutAbort := False;
  FFirstTimeOutRequest := False;
end;

constructor TDnTcpChannel.Create(Reactor: TDnTcpReactor; Sock: TSocket; RemoteAddr: TSockAddrIn);
begin
  inherited Create;
  FReactor := Reactor;
  FSocket := Sock;
  FRemoteAddr := RemoteAddr;
  InitChannel;
end;

//{$O-}
constructor TDnTcpChannel.CreateEmpty(Reactor: TDnTcpReactor; const RemoteIP: String; Port: Word);
begin
  inherited Create;
  FReactor := Reactor;
  FSocket := WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, Nil, 0, WSA_FLAG_OVERLAPPED);
  if FSocket = INVALID_SOCKET then
    raise EDnException.Create(ErrWin32Error, WSAGetLastError(), 'WSASocket');
  FillChar(FRemoteAddr, 0, SizeOf(FRemoteAddr));
  FRemoteAddr.sin_addr.S_addr := inet_addr(PChar(RemoteIP));
  FRemoteAddr.sin_port := htons(Port);
  FRemoteAddr.sin_family := AF_INET;
  InitChannel;
end;


function TDnTcpChannel.GetImplementation: Pointer;
begin
  Result := Pointer(TDnTcpChannel(Self));
end;
//{$O+}

function TDnTcpChannel.GetRemoteAddrPtr: Pointer;
begin
  Result := @FRemoteAddr;
end;

procedure TDnTcpChannel.IncrementIOCounter;
begin
  FTimeOutGuard.Acquire;
  Inc(FFinishedIOCount);
  FTimeOutGuard.Release;
end;

procedure TDnTcpChannel.IssueTimeOutRequest(TimeOut: Cardinal; Request: IDnIORequest);
begin
  FTimeOutGuard.Acquire;
  FTimeOutObserver := Nil;
  FTimeOutObserver := FReactor.FTimer.RequestTimerNotify(Self, TimeOut, Pointer(FFinishedIOCount));
  FTimeOutGuard.Release;
end;


procedure TDnTcpChannel.SetCustomData(P: Pointer);
begin
  FCustomData := P;
end;

function TDnTcpChannel.GetCustomData: Pointer;
begin
  Result := FCustomData;
end;

procedure TDnTcpChannel.SetTimeOut(Value: Cardinal);
begin
  if FTimeOut = 0 then
    FTimeOut := Value
  else
    raise EDnException.Create(ErrCannotSetTimeOutTwice, 0);
end;

function TDnTcpChannel.CacheSize: Cardinal;
begin
  Result := Length(FCache);
end;

function TDnTcpChannel.CachePtr: PChar;
begin
  Result := PChar(FCache);
end;

function TDnTcpChannel.Add2Cache(Block: PChar; BlockSize: Cardinal): Cardinal;
var CacheLen: Cardinal;
begin
  if BlockSize <> 0 then
  begin
    CacheLen := Length(FCache);
    SetLength(FCache, CacheLen + BlockSize);
    Move(Block^, FCache[CacheLen+1], BlockSize);
    Result := Length(FCache);
  end else
    Result := 0;
end;

procedure TDnTcpChannel.InsertToCache(Block: PChar; BlockSize: Cardinal);
var CacheLen : Cardinal;
begin
  if (Block<>Nil) and (BlockSize <> 0) then
  begin
    CacheLen := Length(FCache);
    SetLength(FCache, CacheLen + BlockSize);
    if CacheLen <> 0 then
      Move(FCache[1], FCache[1+BlockSize], CacheLen);
    Move(Block^, FCache[1], BlockSize);
  end;
end;


function TDnTcpChannel.ExtractFromCache(Block: PChar; BlockSize: Cardinal): Cardinal;
var CacheLen: Cardinal;
begin
  CacheLen := Length(FCache);
  if CacheLen < BlockSize then
  begin
    Move(FCache[1], Block^, CacheLen);
    SetLength(FCache, 0);
    Result := CacheLen;
  end else
  begin
    Move(FCache[1], Block^, BlockSize);
    Delete(FCache, 1, BlockSize);
    Result := BlockSize;
  end;
end;

function TDnTcpChannel.CacheHasData: Boolean;
begin
  Result := Length(FCache) <> 0;
end;

//Stop/cancel the timeout tracking activity. It should be called only 1 time.
//There isn't a back way - you can't start timeout tracking again. 
procedure TDnTcpChannel.StopTimeOutTracking;
begin
  if FTimeOutObserver <> Nil then
  begin
    FTimeOutAbort := True;
    FTimeOutObserver.Cancel;
    FTimeOutObserver := Nil;
  end;
end;

function TDnTcpChannel.IsClosing: Boolean;
begin
  Result := FClosingRequest <> Nil;
end;

procedure TDnTcpChannel.RunRequest(Request: IDnIORequest);
begin
  if not FClosing then
  begin
    FRunGuard.Acquire;
    try
      case Request.RequestType of
        rtClose:    begin
                      FClosingRequest := Request;
                      if (FReadQueue.Count = 0) and (FWriteQueue.Count = 0) and
                          (FConnectingRequest = Nil) then
                        Request.Execute;
                    end;
        rtBrutalClose:
                    begin
                      if FClosingRequest = Nil then
                      begin
                        FClosingRequest := Request;
                        Request.Execute;
                      end;
                    end;

        rtConnect:  begin
                      FConnectingRequest := Request;
                      Request.Execute;
                    end;

        rtRead:     if (FReadQueue.Add(Request) = 0) and (FConnectingRequest = Nil) then
                      Request.Execute;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -