📄 psitcpserver.pas
字号:
unit PsiTCPServer;
//******************************************************************************
// The original software is under
// Copyright (c) 1993 - 2000, Chad Z. Hower (Kudzu)
// and the Indy Pit Crew - http://www.nevrona.com/Indy/
//
// Amended : November 2000, by Michael M. Michalak MACS for use with
// MorphTek.com Inc Peer to Peer Open Source Components - http://www.morphtek.com
//
//******************************************************************************
interface
uses
Classes, sysutils,
PsiComponent, PsiSocketHandle, PsiTCPConnection, PsiThread, PsiThreadMgr,
PsiThreadMgrDefault,
PsiIntercept;
type
TPsiTCPServer = class;
// This is the thread that listens for incoming connections and spawns
// new ones to handle each one
TPsiListenerThread = class(TPsiThread)
protected
FAcceptWait: Integer;
FBindingList: TList;
FServer: TPsiTCPServer;
public
procedure AfterRun; override;
constructor Create(axServer: TPsiTCPServer); reintroduce;
destructor Destroy; override;
procedure Run; override;
//
property AcceptWait: integer read FAcceptWait write FAcceptWait;
property Server: TPsiTCPServer read FServer;
end;
TPsiTCPServerConnection = Class(TPsiTCPConnection)
protected
// FLastRcvTimeStamp: TDateTime; //Timestamp of latest received command
// FProcessingTimeout: boolean; //To avoid double timeout processing
//
function GetServer: TPsiTCPServer;
public
// property LastRcvTimeStamp: TDateTime read fLastRcvTimeStamp write fLastRcvTimeStamp;
// property ProcessingTimeout: boolean read fbProcessingTimeout write fbProcessingTimeout;
// function Read(const piLen: Integer): string; override;
published
property Server: TPsiTCPServer read GetServer;
end;
TPsiPeerThread = class(TPsiThread)
protected
FConnection: TPsiTCPServerConnection;
//
procedure AfterRun; override;
procedure BeforeRun; override;
public
destructor Destroy; override;
procedure Run; override;
//
property Connection: TPsiTCPServerConnection read FConnection;
end;
TPsiServerThreadEvent = procedure(AThread: TPsiPeerThread) of object;
TPsiTCPServer = class(TPsiComponent)
protected
FAcceptWait: integer;
FActive, FImplicitThreadMgrCreated: Boolean;
FThreadMgr: TPsiThreadMgr;
FBindings: TPsiSocketHandles;
FListenerThread: TPsiListenerThread;
FTerminateWaitTime: Integer;
FThreadClass: TPsiThreadClass;
FThreads: TThreadList;
FOnExecute, FOnConnect, FOnDisconnect: TPsiServerThreadEvent;
//
FIntercept: TPsiServerIntercept;
//
// NOTE this and the event occur in the context of the listener thread
procedure DoConnect(axThread: TPsiPeerThread); virtual;
procedure DoDisconnect(axThread: TPsiPeerThread); virtual;
function DoExecute(AThread: TPsiPeerThread): boolean; virtual;
function GetDefaultPort: integer;
procedure Notification(AComponent: TComponent; Operation: TOperation); override;
procedure SetAcceptWait(AValue: integer);
procedure SetActive(AValue: Boolean); virtual;
procedure SetBindings(const abValue: TPsiSocketHandles);
procedure SetDefaultPort(const AValue: integer);
procedure SetIntercept(const Value: TPsiServerIntercept);
procedure SetThreadMgr(const Value: TPsiThreadMgr);
procedure TerminateAllThreads;
public
constructor Create(AOwner: TComponent); override;
destructor Destroy; override;
procedure Loaded; override;
//
property ImplicitThreadMgrCreated: Boolean read FImplicitThreadMgrCreated;
property ThreadClass: TPsiThreadClass read FThreadClass write FThreadClass;
property Threads: TThreadList read FThreads;
published
property AcceptWait: integer read FAcceptWait write SetAcceptWait default 1000;
property Active: boolean read FActive write SetActive;
property Bindings: TPsiSocketHandles read FBindings write SetBindings;
property DefaultPort: integer read GetDefaultPort write SetDefaultPort;
property Intercept: TPsiServerIntercept read FIntercept write SetIntercept;
// Occurs in the context of the peer thread
property OnConnect: TPsiServerThreadEvent read FOnConnect write FOnConnect;
// Occurs in the context of the peer thread
property OnExecute: TPsiServerThreadEvent read FOnExecute write FOnExecute;
// Occurs in the context of the peer thread
property OnDisconnect: TPsiServerThreadEvent read FOnDisconnect write FOnDisconnect;
property TerminateWaitTime: Integer read FTerminateWaitTime write FTerminateWaitTime
default 5000;
property ThreadMgr: TPsiThreadMgr read FThreadMgr write SetThreadMgr;
end;
implementation
uses
PsiException, PsiGlobal, PsiResourceStrings, PsiStack, PsiStackConsts;
{ TPsiTCPServer }
constructor TPsiTCPServer.Create(AOwner: TComponent);
begin
inherited;
FAcceptWait := 1000;
FTerminateWaitTime := 5000;
FThreads := TThreadList.Create;
FBindings := TPsiSocketHandles.Create(Self);
FThreadClass := TPsiPeerThread;
// fSessionTimer := TTimer.Create(self);
end;
destructor TPsiTCPServer.Destroy;
begin
Active := False;
TerminateAllThreads;
FreeAndNil(FBindings);
FreeAndNil(FThreads);
inherited;
end;
procedure TPsiTCPServer.DoConnect(axThread: TPsiPeerThread);
begin
if assigned(OnConnect) then begin
OnConnect(axThread);
end;
end;
procedure TPsiTCPServer.DoDisconnect(axThread: TPsiPeerThread);
begin
if assigned(OnDisconnect) then begin
OnDisconnect(axThread);
end;
end;
function TPsiTCPServer.DoExecute(AThread: TPsiPeerThread): boolean;
begin
result := assigned(OnExecute);
if result then begin
OnExecute(AThread);
end;
end;
function TPsiTCPServer.GetDefaultPort: integer;
begin
result := FBindings.DefaultPort;
end;
procedure TPsiTCPServer.Loaded;
begin
inherited;
// Active=True must not be performed before all other props are loaded
if Active then begin
FActive := False;
Active := True;
end;
end;
procedure TPsiTCPServer.Notification(AComponent: TComponent; Operation: TOperation);
begin
inherited;
// remove the reference to the linked components if they are deleted
if (Operation = opRemove) then begin
if (AComponent = FThreadMgr) then begin
FThreadMgr := nil;
end else if (AComponent = FIntercept) then begin
FIntercept := nil;
end;
end;
end;
procedure TPsiTCPServer.SetAcceptWait(AValue: integer);
begin
if Active then begin
raise EPsiException.Create(RSAcceptWaitCannotBeModifiedWhileServerIsActive);
end;
FAcceptWait := AValue;
end;
procedure TPsiTCPServer.SetActive(AValue: Boolean);
var
i: Integer;
begin
if (not (csDesigning in ComponentState)) and (FActive <> AValue)
and (not (csLoading in ComponentState)) then begin
if AValue then begin
// Set up bindings
if Bindings.Count < 1 then
if DefaultPort > 0 then begin
Bindings.Add;
end else begin
raise EPsiException.Create(RSNoBindingsSpecified);
end;
for i := 0 to Bindings.Count - 1 do
begin
Bindings[i].AllocateSocket;
Bindings[i].Bind;
Bindings[i].Listen;
end;
// Set up ThreadMgr
FImplicitThreadMgrCreated := not assigned(ThreadMgr);
if ImplicitThreadMgrCreated then begin
ThreadMgr := TPsiThreadMgrDefault.Create(Self);
end;
ThreadMgr.ThreadClass := ThreadClass;
// Set up listener thread
FListenerThread := TPsiListenerThread.Create(Self);
FListenerThread.AcceptWait := AcceptWait;
FListenerThread.Start;
end else begin
// Stop listening
for i := 0 to Bindings.Count - 1 do begin
Bindings[i].CloseSocket;
end;
// Tear down ThreadMgr
TerminateAllThreads;
if ImplicitThreadMgrCreated then begin
FreeAndNil(FThreadMgr);
end;
FImplicitThreadMgrCreated := false;
// Tear down Listener thread
FListenerThread.TerminateAndWaitFor;
FListenerThread.Free;
end;
end;
FActive := AValue;
end;
procedure TPsiTCPServer.SetBindings(const abValue: TPsiSocketHandles);
begin
FBindings.Assign(abValue);
end;
procedure TPsiTCPServer.SetDefaultPort(const AValue: integer);
begin
FBindings.DefaultPort := AValue;
end;
procedure TPsiTCPServer.SetIntercept(const Value: TPsiServerIntercept);
begin
FIntercept := Value;
// Add self to the intercept's notification list
if assigned(FIntercept) then
begin
FIntercept.FreeNotification(Self);
end;
end;
procedure TPsiTCPServer.SetThreadMgr(const Value: TPsiThreadMgr);
begin
FThreadMgr := Value;
// Ensure we will be notified when the component is freed, even is it's on
// another form
if Value <> nil then begin
Value.FreeNotification(self);
end;
end;
procedure TPsiTCPServer.TerminateAllThreads;
var
i: integer;
list: TList;
Thread: TPsiPeerThread;
const
LSleepTime: integer = 250;
begin
list := Threads.LockList; try
for i := 0 to list.Count - 1 do begin
Thread := TPsiPeerThread(list[i]);
Thread.Connection.DisconnectSocket;
end;
finally Threads.UnlockList; end;
// Must wait for all threads to terminate, as they access the server and bindings. If this
// routine is being called from the destructor, this can cause AVs/
//
// This method is used instead of:
// -Threads.WaitFor. Since they are being destroyed thread.Wait for could AV. And Waiting for
// Handle produces different code for different OSs, and using common code has troubles
// as the handles are quite different.
// -Last thread signaling
for i := 1 to (TerminateWaitTime div LSleepTime) do begin
Sleep(LSleepTime);
list := Threads.LockList; try
if list.Count = 0 then begin
break;
end;
finally Threads.UnlockList; end;
end;
end;
{ TPsiListenerThread }
procedure TPsiListenerThread.AfterRun;
var
i: Integer;
begin
inherited;
for i := Server.Bindings.Count - 1 downto 0 do begin
Server.Bindings[i].CloseSocket;
end;
end;
constructor TPsiListenerThread.Create(axServer: TPsiTCPServer);
begin
inherited Create;
FBindingList := TList.Create;
FServer := axServer;
end;
destructor TPsiListenerThread.Destroy;
begin
FBindinglist.Free;
inherited;
end;
procedure TPsiListenerThread.Run;
var
PeerIP: string;
PeerPort: integer;
Peer: TPsiTCPServerConnection;
Thread: TPsiPeerThread;
i : Integer;
begin
FBindingList.Clear;
for i:= 0 to Server.Bindings.Count - 1 do begin
FBindingList.Add(TObject(Server.Bindings[i].Handle));
end;
if GStack.WSSelect(FBindingList, nil, nil, AcceptWait) > 0 then begin
if not Terminated then begin
for i := 0 to FBindingList.Count - 1 do begin
Peer := TPsiTCPServerConnection.Create(Server);
with Peer do begin
Binding.Accept(Server.Bindings[i].Handle, PeerIP, PeerPort);
Binding.IP := Server.Bindings[i].IP;
Binding.Port := Server.Bindings[i].Port;
Binding.SetPeer(PeerIP, PeerPort);
// LastRcvTimeStamp := Now; // Added for session timeout support
// ProcessingTimeout := False;
if assigned(Server.Intercept) then begin
try
Peer.Intercept := Server.Intercept.Accept(Binding);
except
FreeAndNil(Peer);
end;
end;
end;
if Peer <> nil then begin
// Create Thread
if Server.ThreadMgr <> nil then
begin
Thread := TPsiPeerThread(Server.ThreadMgr.GetThread);
Thread.FConnection := Peer;
Server.Threads.Add(Thread);
Thread.Start;
end;
end;
end;
end;
end;
end;
{ TPsiTCPServerConnection }
function TPsiTCPServerConnection.GetServer: TPsiTCPServer;
begin
result := Owner as TPsiTCPServer;
end;
{ TPsiPeerThread }
procedure TPsiPeerThread.AfterRun;
begin
// Other things are done in destructor
with Connection.Server do begin
DoDisconnect(Self);
ThreadMgr.ReleaseThread(Self);
Threads.Remove(Self);
end;
end;
procedure TPsiPeerThread.BeforeRun;
begin
Connection.Server.DoConnect(Self);
end;
destructor TPsiPeerThread.Destroy;
begin
Connection.Free;
inherited;
end;
procedure TPsiPeerThread.Run;
begin
try
if not Connection.Server.DoExecute(Self) then begin
raise EPsiException.Create(RSNoExecuteSpecified);
end;
except
on E: EPsiSocketError do begin
case E.LastError of
Psi_WSAECONNABORTED, // WSAECONNABORTED - Other side disconnected
Psi_WSAECONNRESET:
Connection.Disconnect;
end;
end;
on EPsiClosedSocket do ;
else
raise;
end;
// If connection lost, stop thread
try
if Connection <> nil then
if not Connection.Connected then
begin
Stop;
end;
except
end;
end;
end.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -