📄 dntcpreactor.pas
字号:
rtWrite: if (FWriteQueue.Add(Request) = 0) and (FConnectingRequest = Nil) then
Request.Execute;
end;
finally
FRunGuard.Release;
end;
end
else
raise EDnException.Create(ErrChannelClosing, 0);
end;
procedure TDnTcpChannel.Bind(Tracker: Pointer);
begin
FTracker := Tracker;
end;
procedure TDnTcpChannel.Unbind;
begin
FTracker := Nil;
end;
function TDnTcpChannel.Tracker: Pointer;
begin
Result := FTracker;
end;
function TDnTcpChannel.IsBound: Boolean;
begin
Result := FTracker <> Nil;
end;
function TDnTcpChannel.RemotePort: Word;
begin
Result := ntohs(FRemoteAddr.sin_port);
end;
function TDnTcpChannel.RemoteAddr: String;
begin
Result := StrPas(inet_ntoa(FRemoteAddr.sin_addr));
end;
function TDnTcpChannel.RemoteHost: String;
begin
Result := StrPas(gethostbyaddr(@FRemoteAddr.sin_addr, SizeOf(FRemoteAddr),AF_INET)^.h_name);
end;
procedure TDnTcpChannel.CloseSocketHandle;
var TempSocket: TSocket;
begin
//stop timeouts
StopTimeOutTracking;
TempSocket := FSocket;
FSocket := INVALID_SOCKET;
Winsock2.closesocket(TempSocket);
end;
class function TDnTcpChannel.MatchingRequest(Context: Pointer): TDnTcpRequest;
var PContext: PDnReqContext;
begin
PContext := PDnReqContext(Context);
Result := TDnTcpRequest(PContext^.FRequest);
end;
function TDnTcpChannel.IsClosed: Boolean;
begin
Result := FSocket = INVALID_SOCKET;
end;
procedure TDnTcpChannel.SetNagle(Value: Boolean);
var Temp: LongBool;
begin
Temp := Value;
Winsock2.setsockopt(FSocket, IPPROTO_TCP, TCP_NODELAY, PChar(@Temp), SizeOf(Temp));
end;
procedure TDnTcpChannel.ExecuteNext(Request: IDnIORequest);
begin
try
FRunGuard.Acquire;
if Request.IsCPUNeeded then
Exit;
case Request.RequestType of
rtBrutalClose,
rtClose: begin
FClosingRequest := Nil;
end;
rtRead: begin
if FReadQueue[0] = IUnknown(Request) then
FReadQueue.Delete(0);
if FReadQueue.Count <> 0 then
(FReadQueue[0] as IDnIORequest).Execute;
end;
rtWrite: begin
if FWriteQueue[0] = IUnknown(Request) then
FWriteQueue.Delete(0);
if FWriteQueue.Count <> 0 then
(FWriteQueue[0] as IDnIORequest).Execute;
end;
rtConnect: begin
FConnectingRequest := Nil;
if FReadQueue.Count <> 0 then
(FReadQueue[0] as IDnIORequest).Execute;
if FWriteQueue.Count <> 0 then
(FWriteQueue[0] as IDnIORequest).Execute;
if (FReadQueue.Count = 0) and (FWriteQueue.Count = 0) and
(FClosingRequest <> Nil) then
FClosingRequest.Execute;
end;
end;
finally
FRunGuard.Release;
end;
end;
destructor TDnTcpChannel.Destroy;
begin
FreeAndNil(FRunGuard);
FreeAndNil(FTimeOutGuard);
//FreeAndNil(FRequest);
FreeAndNil(FReadQueue);
FreeAndNil(FWriteQueue);
if FClosingRequest <> Nil then
FClosingRequest := Nil;
// if FConnectRequest <> Nil then
// FConnectRequest := Nil;
inherited Destroy;
end;
//-----------------------------------------------------------------------------
constructor TDnTcpReactor.Create{$IFDEF ROOTISCOMPONENT}(AOwner: TComponent){$ENDIF};
begin
inherited Create{$IFDEF ROOTISCOMPONENT}(AOwner){$ENDIF};
FPort := 0;
FActive := False;
FGuard := TDnMutex.Create;
FChannels := TInterfaceList.Create;
FExecutor := Nil;
FTimer := Nil;
FTimerExecutor := Nil;
FLogger := Nil;
FLogLevel := llMandatory;
FTimeOutExpiredEvent := Nil;
end;
destructor TDnTcpReactor.Destroy;
begin
if FActive then
SetActive(False);
FreeAndNil(FGuard);
inherited Destroy;
end;
function TDnTcpReactor.GetChannelCount: Integer;
begin
FGuard.Acquire;
if Assigned(FChannels) then
Result := FChannels.Count
else
Result := 0;
FGuard.Release;
end;
{$IFDEF ROOTISCOMPONENT}
procedure TDnTcpReactor.Notification(AComponent: TComponent; Operation: TOperation);
begin
if Operation = opRemove then
begin
if AComponent = FLogger then
FLogger := Nil
else
if AComponent = FExecutor then
FExecutor := Nil;
end;
end;
{$ENDIF}
procedure TDnTcpReactor.SetActive(Value: Boolean);
begin
if not FActive and Value then
FActive := TurnOn
else if FActive and not Value then
FActive := TurnOff;
end;
function TDnTcpReactor.MakeChannel(const IPAddress: String; Port: Word): IDnChannel;
begin
Result := TDnTcpChannel.CreateEmpty(Self, IPAddress, Port);
{
FGuard.Acquire;
try
FChannels.Add(Result);
finally
FGuard.Release;
end; }
end;
procedure TDnTcpReactor.SetTimeout(Channel: IDnChannel; Value: Cardinal);
var ChannelImpl: TDnTcpChannel;
begin
if @FTimeOutExpiredEvent = Nil then
raise EDnException.Create(ErrInvalidConfig, 0);
ChannelImpl := TDnTcpChannel.CheckImpl(Channel);
ChannelImpl.SetTimeOut(Value);
Channel := Nil;
end;
procedure TDnTcpReactor.RemoveChannel(Channel: IDnChannel);
begin
if Channel.IsClosed then
begin
FGuard.Acquire;
FChannels.Remove(Channel);
FGuard.Release;
Channel := Nil;
end
else
raise EDnException.Create(ErrCannotRemoveOpenedChannel, 0);
end;
procedure TDnTcpReactor.TimeOutFired(Context: TDnThreadContext; Channel: IDnChannel;
ExpiredTacts: Cardinal; Key: Pointer);
var ChannelImpl: TDnTcpChannel;
begin
ChannelImpl := TDnTcpChannel.CheckImpl(Channel);
if ChannelImpl.FFinishedIOCount = Cardinal(Key) then
PostQueuedCompletionStatus(FPort, $FFFFFFFF, 0, Pointer(ChannelImpl))
else
if ChannelImpl.FTimeOut <> 0 then
ChannelImpl.IssueTimeOutRequest(ChannelImpl.TimeOut, Nil)
else
begin
ChannelImpl.FFirstTimeOutRequest := False;
ChannelImpl.FFinishedIOCount := 0;
end;
end;
function TDnTcpReactor.TurnOn: Boolean;
var TempSocket: TSocket;
begin
TempSocket := Winsock2.socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
if TempSocket = Winsock2.INVALID_SOCKET then
raise EDnException.Create(ErrWin32Error, WSAGetLastError(), 'Socket');
FPort := CreateIOCompletionPort(TempSocket, 0, 0, 1);
if FPort = 0 then
begin
Winsock2.closesocket(TempSocket);
raise EDnException.Create(ErrWin32Error, GetLastError(), 'CreateIOCompletionPort');
end;
Winsock2.closesocket(TempSocket);
FTimer := TDnTimer.Create({$IFDEF ROOTISCOMPONENT}Nil{$ENDIF});
FTimerExecutor := TDnSimpleExecutor.Create({$IFDEF ROOTISCOMPONENT}Nil{$ENDIF});
FTimer.Executor := FTimerExecutor;
FTimer.OnTimerNotify := TimeOutFired;
FTimer.Active := True;
FThread := TDnTcpReactorThread.Create(Self);
Result := True;
end;
procedure TDnTcpReactor.CloseChannels;
var i: Integer;
ChannelImpl: TDnTcpChannel;
begin
FGuard.Acquire;
for i:=0 to FChannels.Count-1 do
begin
ChannelImpl := TDnTcpChannel.CheckImpl(FChannels[i]);
ChannelImpl.CloseSocketHandle;
end;
FGuard.Release;
end;
function TDnTcpReactor.TurnOff: Boolean;
var ChannelImpl: TDnTcpChannel;
i: Integer;
begin
FGuard.Acquire;
for i:=0 to FChannels.Count-1 do
begin
ChannelImpl := TDnTcpChannel.CheckImpl(FChannels[i]);
ChannelImpl.CloseSocketHandle;
end;
PostQueuedCompletionStatus(FPort, 0, 0, Nil);
FreeAndNil(FThread);
FreeAndNil(FTimer);
FreeAndNil(FTimerExecutor);
//FreeAndNil(FTimerEngine);
FExecutor.Active := False;
//for i:=0 to FChannels.Count-1 do
// TDnTcpChannel.(FChannels[i])._Release;
FChannels.Clear;
FGuard.Release;
Result := False;
end;
procedure TDnTcpReactor.PostChannel(Channel: IDnChannel);
var ChannelImpl: TDnTcpChannel;
begin
ChannelImpl := TDnTcpChannel.CheckImpl(Channel);
if CreateIOCompletionPort(ChannelImpl.FSocket, FPort, Cardinal(Pointer(ChannelImpl)), 1) = 0 then
raise EDnException.Create(ErrWin32Error, GetLastError(), 'CreateIOCompletionPort');
FChannels.Add(Channel);
end;
procedure TDnTcpReactor.PostChannelError(Channel: TDnTcpChannel; Request: TDnTcpRequest);
begin
Request.FErrorCode := WSAGetLastError;
//PostQueuedCompletionStatus(FPort,
FExecutor.PostEvent(Request);//, Channel);
end;
procedure TDnTcpReactor.OnTimeOutExpired(Context: TDnThreadContext; Channel: IDnChannel);
var ChannelImpl: TDnTcpChannel;
begin
ChannelImpl := TDnTcpChannel.CheckImpl(Channel);
if not ChannelImpl.FTimeOutAbort then
FTimeOutExpiredEvent(Context, Channel);
end;
//-------------------------------------------------------------------------
constructor TDnTcpReactorThread.Create(Reactor: TDnTcpReactor);
begin
inherited Create;
FReactor := Reactor;
FreeOnTerminate := False;
Self.Run;
end;
destructor TDnTcpReactorThread.Destroy;
begin
inherited Destroy;
end;
procedure TDnTcpReactorThread.CreateContext;
begin
FContext := Nil;
SetCurrentContext(Nil);
end;
procedure TDnTcpReactorThread.DestroyContext;
begin
;
end;
procedure TDnTcpReactorThread.LogMessage(S: String);
begin
if FReactor.FLogger <> Nil then
try
FReactor.FLogger.LogMsg(FReactor.FLogLevel, S);
except
; //if logger failed - just ignore
end;
end;
procedure TDnTcpReactorThread.DoRequest(Request: TDnTcpRequest; Channel: TDnTcpChannel);
var Response: IDnIOResponse;
begin
Response := Request;
Channel.ExecuteNext(Request);
if Assigned(FReactor.FExecutor) then
FReactor.FExecutor.PostEvent(Response);
Response := Nil;
Request := Nil;
end;
procedure TDnTcpReactorThread.DoClose(Channel: TDnTcpChannel; Request: TDnTcpRequest);
begin
end;
procedure TDnTcpReactorThread.ParseIONotification(Transferred: Cardinal; Key: Cardinal; Overlapped: POverlapped);
var Channel: TDnTcpChannel;
Request: TDnTcpRequest;
ReqContext: PDnReqContext;
begin
InterlockedIncrement(ParseIONotificationCount);
if Transferred = $FFFFFFFF then
begin
Channel := TDnTcpChannel(Overlapped);
Channel.FTimeOutExpired := True;
if not Channel.FTimeOutAbort then
FReactor.FExecutor.PostEvent(TDnTimeOutWrapper.Create(Channel, FReactor));
end else
begin
ReqContext := PDnReqContext(Overlapped);
if not ReqContext^.FReqRouting then
begin
FReactor.FExecutor.PostEvent(IUnknown(ReqContext^.FRequest) as IDnIOResponse);
end else
begin
Request := TDnTcpChannel.MatchingRequest(Overlapped);
Channel := TDnTcpChannel.CheckImpl(Request.FChannel);
UpdateCounters(Channel, Request);
Request.SetTransferred(Transferred);
if not Channel.TimeOutExpired or (Request.RequestType = rtBrutalClose) then
begin
if not Request.IsComplete and not Request.IsCPUNeeded then
begin
if Transferred <> 0 then
Request.ReExecute
else
raise EDnException.Create(ErrZeroTransferDetected, 0);
end else
DoRequest(Request, Channel);
end;
end; //if not ReqContext.FReqRouting
end;
end;
procedure TDnTcpReactorThread.ParseIOError(Transferred: Cardinal; Key: Cardinal; Overlapped: POverlapped);
var Channel: TDnTcpChannel;
Request: TDnTcpRequest;
ReqContext: PDnReqContext;
Source: IUnknown;
begin
ReqContext := PDnReqContext(Overlapped);
if ReqContext^.FReqRouting then
begin
Request := TDnTcpChannel.MatchingRequest(Overlapped);
Channel := TDnTcpChannel.CheckImpl(Request.FChannel);
Channel.StopTimeOutTracking; //it is really fast operation
UpdateCounters(Channel, Request);
Request.CatchError;
DoRequest(Request, Channel);
end else
begin
Source := IUnknown(ReqContext^.FRequest);
(Source as IDnIOErrorValue).StoreError(WSAGetLastError());
FReactor.FExecutor.PostEvent(Source as IDnIOResponse);
end;
end;
procedure TDnTcpReactorThread.UpdateCounters(Channel: TDnTcpChannel; Request: TDnTcpRequest);
begin
Channel.IncrementIOCounter;
{ case Request.RequestType of
rtRead: Channel.IncrementReadCounter;
rtWrite: Channel.IncrementWriteCounter;
rtConnect: Channel.IncrementReadCounter;
rtClose: Channel.IncrementReadCounter;
end; }
end;
procedure TDnTcpReactorThread.ThreadRoutine;
var Transferred: Cardinal;
Key: Cardinal;
Overlapped: POverlapped;
ResCode: LongBool;
begin
SetCurrentContext(Nil);
while True do
begin
ResCode := GetQueuedCompletionStatus(FReactor.FPort, Transferred, Key, Overlapped, INFINITE);
if ResCode then
begin
if (Transferred = 0) and (Key = 0) and (Overlapped = Nil) then
Exit //signal to terminate thread
else
begin
; //notify channel
ParseIONotification(Transferred, Key, Overlapped);
end;
end
else //error trapped
begin
if Overlapped = Nil then
//win32 kernel error? just ignore it 8)
else
begin
; //IO error
ParseIOError(Transferred, Key, Overlapped);
end;
end;
end;
end;
//-------------------------------------------------------------------------
procedure Register;
begin
{$IFDEF ROOTISCOMPONENT}
RegisterComponents('DNet', [TDnTcpReactor]);
{$ENDIF}
end;
initialization
ParseIONotificationCount := 0;
end.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -