⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dntcpreactor.pas

📁 一个国外比较早的IOCP控件
💻 PAS
📖 第 1 页 / 共 2 页
字号:
        rtWrite:    if (FWriteQueue.Add(Request) = 0) and (FConnectingRequest = Nil) then
                      Request.Execute;
      end;

    finally
      FRunGuard.Release;
    end;
  end
  else
    raise EDnException.Create(ErrChannelClosing, 0);
end;

procedure TDnTcpChannel.Bind(Tracker: Pointer);
begin
  FTracker := Tracker;
end;

procedure TDnTcpChannel.Unbind;
begin
  FTracker := Nil;
end;

function TDnTcpChannel.Tracker: Pointer;
begin
  Result := FTracker;
end;

function  TDnTcpChannel.IsBound: Boolean;
begin
  Result := FTracker <> Nil;
end;

function TDnTcpChannel.RemotePort: Word;
begin
  Result := ntohs(FRemoteAddr.sin_port);
end;

function TDnTcpChannel.RemoteAddr: String;
begin
  Result := StrPas(inet_ntoa(FRemoteAddr.sin_addr));
end;

function TDnTcpChannel.RemoteHost: String;
begin
  Result := StrPas(gethostbyaddr(@FRemoteAddr.sin_addr, SizeOf(FRemoteAddr),AF_INET)^.h_name);
end;

procedure TDnTcpChannel.CloseSocketHandle;
var TempSocket: TSocket;
begin
  //stop timeouts
  StopTimeOutTracking;
  TempSocket := FSocket;
  FSocket := INVALID_SOCKET;
  Winsock2.closesocket(TempSocket);
end;

class function TDnTcpChannel.MatchingRequest(Context: Pointer): TDnTcpRequest;
var PContext: PDnReqContext;
begin
  PContext := PDnReqContext(Context);
  Result := TDnTcpRequest(PContext^.FRequest);
end;

function TDnTcpChannel.IsClosed: Boolean;
begin
  Result := FSocket = INVALID_SOCKET;
end;

procedure TDnTcpChannel.SetNagle(Value: Boolean);
var Temp: LongBool;
begin
  Temp := Value;
  Winsock2.setsockopt(FSocket, IPPROTO_TCP, TCP_NODELAY, PChar(@Temp), SizeOf(Temp));
end;

procedure TDnTcpChannel.ExecuteNext(Request: IDnIORequest);
begin
  try
    FRunGuard.Acquire;

    if Request.IsCPUNeeded then
      Exit;
    case Request.RequestType of
      rtBrutalClose,
      rtClose:    begin
                    FClosingRequest := Nil;
                  end;
      rtRead:     begin
                    if FReadQueue[0] = IUnknown(Request) then
                      FReadQueue.Delete(0);
                    if FReadQueue.Count <> 0 then
                      (FReadQueue[0] as IDnIORequest).Execute;
                  end;
      rtWrite:    begin
                    if FWriteQueue[0] = IUnknown(Request) then
                      FWriteQueue.Delete(0);
                    if FWriteQueue.Count <> 0 then
                      (FWriteQueue[0] as IDnIORequest).Execute;
                  end;
      rtConnect:  begin
                    FConnectingRequest := Nil;
                    if FReadQueue.Count <> 0 then
                      (FReadQueue[0] as IDnIORequest).Execute;
                    if FWriteQueue.Count <> 0 then
                      (FWriteQueue[0] as IDnIORequest).Execute;
                    if (FReadQueue.Count = 0) and (FWriteQueue.Count = 0) and
                        (FClosingRequest <> Nil) then
                      FClosingRequest.Execute;
                  end;
    end;
  finally
    FRunGuard.Release;
  end;
end;

destructor TDnTcpChannel.Destroy;
begin
  FreeAndNil(FRunGuard);
  FreeAndNil(FTimeOutGuard);
  //FreeAndNil(FRequest);
  FreeAndNil(FReadQueue);
  FreeAndNil(FWriteQueue);
  if FClosingRequest <> Nil then
    FClosingRequest := Nil;
//  if FConnectRequest <> Nil then
//    FConnectRequest := Nil;
  inherited Destroy;
end;
//-----------------------------------------------------------------------------

constructor TDnTcpReactor.Create{$IFDEF ROOTISCOMPONENT}(AOwner: TComponent){$ENDIF};
begin
  inherited Create{$IFDEF ROOTISCOMPONENT}(AOwner){$ENDIF};
  FPort := 0;
  FActive := False;
  FGuard := TDnMutex.Create;
  FChannels := TInterfaceList.Create;
  FExecutor := Nil;
  FTimer := Nil;
  FTimerExecutor := Nil;
  FLogger := Nil;
  FLogLevel := llMandatory;
  FTimeOutExpiredEvent := Nil;
end;

destructor TDnTcpReactor.Destroy;
begin
  if FActive then
    SetActive(False);
  FreeAndNil(FGuard);
  inherited Destroy;
end;

function  TDnTcpReactor.GetChannelCount: Integer;
begin
  FGuard.Acquire;
  if Assigned(FChannels) then
    Result := FChannels.Count
  else
    Result := 0;
  FGuard.Release;
end;

{$IFDEF ROOTISCOMPONENT}
procedure TDnTcpReactor.Notification(AComponent: TComponent; Operation: TOperation);
begin
  if Operation = opRemove then
  begin
    if AComponent = FLogger then
      FLogger := Nil
    else
    if AComponent = FExecutor then
      FExecutor := Nil;
  end;
end;
{$ENDIF}


procedure TDnTcpReactor.SetActive(Value: Boolean);
begin
  if not FActive and Value then
    FActive := TurnOn
  else if FActive and not Value then
    FActive := TurnOff;
end;

function TDnTcpReactor.MakeChannel(const IPAddress: String; Port: Word): IDnChannel;
begin
  Result := TDnTcpChannel.CreateEmpty(Self, IPAddress, Port);
{
  FGuard.Acquire;
  try
    FChannels.Add(Result);
  finally
    FGuard.Release;
  end; }
end;

procedure TDnTcpReactor.SetTimeout(Channel: IDnChannel; Value: Cardinal);
var ChannelImpl: TDnTcpChannel;
begin
  if @FTimeOutExpiredEvent = Nil then
    raise EDnException.Create(ErrInvalidConfig, 0);
  ChannelImpl := TDnTcpChannel.CheckImpl(Channel);
  ChannelImpl.SetTimeOut(Value);
  Channel := Nil;
end;


procedure TDnTcpReactor.RemoveChannel(Channel: IDnChannel);
begin
  if Channel.IsClosed then
  begin
    FGuard.Acquire;
    FChannels.Remove(Channel);
    FGuard.Release;
    Channel := Nil;
  end
  else
    raise EDnException.Create(ErrCannotRemoveOpenedChannel, 0);
end;

procedure TDnTcpReactor.TimeOutFired(Context: TDnThreadContext; Channel: IDnChannel;
                              ExpiredTacts: Cardinal; Key: Pointer);
var ChannelImpl: TDnTcpChannel;
begin
  ChannelImpl := TDnTcpChannel.CheckImpl(Channel);
  if ChannelImpl.FFinishedIOCount = Cardinal(Key) then
    PostQueuedCompletionStatus(FPort, $FFFFFFFF, 0, Pointer(ChannelImpl))
  else
  if ChannelImpl.FTimeOut <> 0 then
    ChannelImpl.IssueTimeOutRequest(ChannelImpl.TimeOut, Nil)
  else
  begin
    ChannelImpl.FFirstTimeOutRequest := False;
    ChannelImpl.FFinishedIOCount := 0;
  end;
end;


function TDnTcpReactor.TurnOn: Boolean;
var TempSocket: TSocket;
begin
  TempSocket := Winsock2.socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
  if TempSocket = Winsock2.INVALID_SOCKET then
    raise EDnException.Create(ErrWin32Error, WSAGetLastError(), 'Socket');

  FPort := CreateIOCompletionPort(TempSocket, 0, 0, 1);
  if FPort = 0 then
  begin
    Winsock2.closesocket(TempSocket);
    raise EDnException.Create(ErrWin32Error, GetLastError(), 'CreateIOCompletionPort');
  end;
  Winsock2.closesocket(TempSocket);


  FTimer := TDnTimer.Create({$IFDEF ROOTISCOMPONENT}Nil{$ENDIF});
  FTimerExecutor := TDnSimpleExecutor.Create({$IFDEF ROOTISCOMPONENT}Nil{$ENDIF});
  FTimer.Executor := FTimerExecutor;
  FTimer.OnTimerNotify := TimeOutFired;
  FTimer.Active := True;

  FThread := TDnTcpReactorThread.Create(Self);

  Result := True;
end;

procedure TDnTcpReactor.CloseChannels;
var i: Integer;
    ChannelImpl: TDnTcpChannel;
begin
  FGuard.Acquire;
  for i:=0 to FChannels.Count-1 do
  begin
    ChannelImpl := TDnTcpChannel.CheckImpl(FChannels[i]);
    ChannelImpl.CloseSocketHandle;
  end;
  FGuard.Release;
end;

function TDnTcpReactor.TurnOff: Boolean;
var ChannelImpl: TDnTcpChannel;
    i: Integer;
begin
  FGuard.Acquire;
  for i:=0 to FChannels.Count-1 do
  begin
    ChannelImpl := TDnTcpChannel.CheckImpl(FChannels[i]);
    ChannelImpl.CloseSocketHandle;
  end;
  PostQueuedCompletionStatus(FPort, 0, 0, Nil);
  FreeAndNil(FThread);
  FreeAndNil(FTimer);
  FreeAndNil(FTimerExecutor);
  //FreeAndNil(FTimerEngine);
  FExecutor.Active := False;
  //for i:=0 to FChannels.Count-1 do
  //  TDnTcpChannel.(FChannels[i])._Release;
  FChannels.Clear;
  FGuard.Release;
  Result := False;
end;

procedure TDnTcpReactor.PostChannel(Channel: IDnChannel);
var ChannelImpl: TDnTcpChannel;
begin
  ChannelImpl := TDnTcpChannel.CheckImpl(Channel);
  if CreateIOCompletionPort(ChannelImpl.FSocket, FPort, Cardinal(Pointer(ChannelImpl)), 1) = 0 then
    raise EDnException.Create(ErrWin32Error, GetLastError(), 'CreateIOCompletionPort');
  FChannels.Add(Channel);
end;

procedure TDnTcpReactor.PostChannelError(Channel: TDnTcpChannel; Request: TDnTcpRequest);
begin
  Request.FErrorCode := WSAGetLastError;
  //PostQueuedCompletionStatus(FPort,
  FExecutor.PostEvent(Request);//, Channel);
end;

procedure TDnTcpReactor.OnTimeOutExpired(Context: TDnThreadContext; Channel: IDnChannel);
var ChannelImpl: TDnTcpChannel;
begin
  ChannelImpl := TDnTcpChannel.CheckImpl(Channel);
  if not ChannelImpl.FTimeOutAbort then
    FTimeOutExpiredEvent(Context, Channel);
end;


//-------------------------------------------------------------------------
constructor TDnTcpReactorThread.Create(Reactor: TDnTcpReactor);
begin
  inherited Create;
  FReactor := Reactor;
  FreeOnTerminate := False;
  Self.Run;
end;

destructor TDnTcpReactorThread.Destroy;
begin
  inherited Destroy;
end;

procedure TDnTcpReactorThread.CreateContext;
begin
  FContext := Nil;
  SetCurrentContext(Nil);
end;

procedure TDnTcpReactorThread.DestroyContext;
begin
  ;
end;

procedure TDnTcpReactorThread.LogMessage(S: String);
begin
  if FReactor.FLogger <> Nil then
  try
    FReactor.FLogger.LogMsg(FReactor.FLogLevel, S);
  except
    ; //if logger failed - just ignore
  end;
end;

procedure TDnTcpReactorThread.DoRequest(Request: TDnTcpRequest; Channel: TDnTcpChannel);
var Response: IDnIOResponse;
begin
  Response := Request;
  Channel.ExecuteNext(Request);
  if Assigned(FReactor.FExecutor) then
    FReactor.FExecutor.PostEvent(Response);
  Response := Nil;
  Request := Nil;
end;

procedure TDnTcpReactorThread.DoClose(Channel: TDnTcpChannel; Request: TDnTcpRequest);
begin
end;

procedure TDnTcpReactorThread.ParseIONotification(Transferred: Cardinal; Key: Cardinal; Overlapped: POverlapped);
var Channel: TDnTcpChannel;
    Request: TDnTcpRequest;
    ReqContext: PDnReqContext;
begin
  InterlockedIncrement(ParseIONotificationCount);
  if Transferred = $FFFFFFFF then
  begin
    Channel := TDnTcpChannel(Overlapped);
    Channel.FTimeOutExpired := True;
    if not Channel.FTimeOutAbort then
      FReactor.FExecutor.PostEvent(TDnTimeOutWrapper.Create(Channel, FReactor));
  end else
  begin
    ReqContext := PDnReqContext(Overlapped);
    if not ReqContext^.FReqRouting then
    begin
      FReactor.FExecutor.PostEvent(IUnknown(ReqContext^.FRequest) as IDnIOResponse);
    end else
    begin
      Request := TDnTcpChannel.MatchingRequest(Overlapped);
      Channel := TDnTcpChannel.CheckImpl(Request.FChannel);
      UpdateCounters(Channel, Request);
      Request.SetTransferred(Transferred);
      if not Channel.TimeOutExpired or (Request.RequestType = rtBrutalClose) then
      begin
        if not Request.IsComplete and not Request.IsCPUNeeded then
        begin
          if Transferred <> 0 then
            Request.ReExecute
          else
            raise EDnException.Create(ErrZeroTransferDetected, 0);
        end else
          DoRequest(Request, Channel);
      end;
    end; //if not ReqContext.FReqRouting
  end;
end;

procedure TDnTcpReactorThread.ParseIOError(Transferred: Cardinal; Key: Cardinal; Overlapped: POverlapped);
var Channel: TDnTcpChannel;
    Request: TDnTcpRequest;
    ReqContext: PDnReqContext;
    Source: IUnknown;
begin
  ReqContext := PDnReqContext(Overlapped);
  if ReqContext^.FReqRouting then
  begin
    Request := TDnTcpChannel.MatchingRequest(Overlapped);
    Channel := TDnTcpChannel.CheckImpl(Request.FChannel);
    Channel.StopTimeOutTracking; //it is really fast operation
    UpdateCounters(Channel, Request);
    Request.CatchError;
    DoRequest(Request, Channel);
  end else
  begin
    Source := IUnknown(ReqContext^.FRequest);
    (Source as IDnIOErrorValue).StoreError(WSAGetLastError());
    FReactor.FExecutor.PostEvent(Source as IDnIOResponse);
  end;
end;

procedure TDnTcpReactorThread.UpdateCounters(Channel: TDnTcpChannel; Request: TDnTcpRequest);
begin
  Channel.IncrementIOCounter;
{  case Request.RequestType of
    rtRead:       Channel.IncrementReadCounter;
    rtWrite:      Channel.IncrementWriteCounter;
    rtConnect:    Channel.IncrementReadCounter;
    rtClose:      Channel.IncrementReadCounter;
  end; }
end;

procedure TDnTcpReactorThread.ThreadRoutine;
var Transferred: Cardinal;
    Key: Cardinal;
    Overlapped: POverlapped;
    ResCode: LongBool;
begin
  SetCurrentContext(Nil);
  while True do
  begin
    ResCode := GetQueuedCompletionStatus(FReactor.FPort, Transferred, Key, Overlapped, INFINITE);
    if ResCode then
    begin
      if (Transferred = 0) and (Key = 0) and (Overlapped = Nil) then
        Exit //signal to terminate thread
      else
      begin
        ; //notify channel
        ParseIONotification(Transferred, Key, Overlapped);
      end;
    end
    else //error trapped
    begin
      if Overlapped = Nil then
         //win32 kernel error? just ignore it 8)
      else
      begin
        ; //IO error
        ParseIOError(Transferred, Key, Overlapped);
      end;
    end;
  end;
end;
//-------------------------------------------------------------------------


procedure Register;
begin
  {$IFDEF ROOTISCOMPONENT}
  RegisterComponents('DNet', [TDnTcpReactor]);
  {$ENDIF}
end;

initialization
  ParseIONotificationCount := 0;
end.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -