⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 psitcpserver.pas

📁 一个delphi的p2p控件的源代码
💻 PAS
字号:
unit PsiTCPServer;

//******************************************************************************
// The original software is under
// Copyright (c) 1993 - 2000, Chad Z. Hower (Kudzu)
//   and the Indy Pit Crew - http://www.nevrona.com/Indy/
//
// Amended : November 2000, by Michael M. Michalak MACS for use with
// MorphTek.com Inc Peer to Peer Open Source Components - http://www.morphtek.com
//
//******************************************************************************

interface

uses
  Classes, sysutils,
  PsiComponent, PsiSocketHandle, PsiTCPConnection, PsiThread, PsiThreadMgr,
  PsiThreadMgrDefault,
  PsiIntercept;

type
  TPsiTCPServer = class;

  // This is the thread that listens for incoming connections and spawns
  // new ones to handle each one
  TPsiListenerThread = class(TPsiThread)
  protected
    FAcceptWait: Integer;
    FBindingList: TList;
    FServer: TPsiTCPServer;
  public
    procedure AfterRun; override;
    constructor Create(axServer: TPsiTCPServer); reintroduce;
    destructor Destroy; override;
    procedure Run; override;
  	//
    property AcceptWait: integer read FAcceptWait write FAcceptWait;
    property Server: TPsiTCPServer read FServer;
  end;

  TPsiTCPServerConnection = Class(TPsiTCPConnection)
  protected
//    FLastRcvTimeStamp: TDateTime;    //Timestamp of latest received command
//    FProcessingTimeout: boolean;     //To avoid double timeout processing
    //
    function GetServer: TPsiTCPServer;
  public
//    property LastRcvTimeStamp: TDateTime read fLastRcvTimeStamp write fLastRcvTimeStamp;
//    property ProcessingTimeout: boolean read fbProcessingTimeout write fbProcessingTimeout;
//    function Read(const piLen: Integer): string; override;
  published
    property Server: TPsiTCPServer read GetServer;
  end;

  TPsiPeerThread = class(TPsiThread)
  protected
    FConnection: TPsiTCPServerConnection;
    //
    procedure AfterRun; override;
    procedure BeforeRun; override;
  public
    destructor Destroy; override;
    procedure Run; override;
    //
    property Connection: TPsiTCPServerConnection read FConnection;
  end;

  TPsiServerThreadEvent = procedure(AThread: TPsiPeerThread) of object;

  TPsiTCPServer = class(TPsiComponent)
  protected
    FAcceptWait: integer;
    FActive, FImplicitThreadMgrCreated: Boolean;
    FThreadMgr: TPsiThreadMgr;
    FBindings: TPsiSocketHandles;
    FListenerThread: TPsiListenerThread;
    FTerminateWaitTime: Integer;
    FThreadClass: TPsiThreadClass;
    FThreads: TThreadList;
    FOnExecute, FOnConnect, FOnDisconnect: TPsiServerThreadEvent;
    //
    FIntercept: TPsiServerIntercept;
    //
    // NOTE this and the event occur in the context of the listener thread
    procedure DoConnect(axThread: TPsiPeerThread); virtual;
    procedure DoDisconnect(axThread: TPsiPeerThread); virtual;
    function DoExecute(AThread: TPsiPeerThread): boolean; virtual;
    function GetDefaultPort: integer;
    procedure Notification(AComponent: TComponent; Operation: TOperation); override;
    procedure SetAcceptWait(AValue: integer);
    procedure SetActive(AValue: Boolean); virtual;
    procedure SetBindings(const abValue: TPsiSocketHandles);
    procedure SetDefaultPort(const AValue: integer);
    procedure SetIntercept(const Value: TPsiServerIntercept);
    procedure SetThreadMgr(const Value: TPsiThreadMgr);
    procedure TerminateAllThreads;
	public
    constructor Create(AOwner: TComponent); override;
    destructor Destroy; override;
    procedure Loaded; override;
    //
    property ImplicitThreadMgrCreated: Boolean read FImplicitThreadMgrCreated;
    property ThreadClass: TPsiThreadClass read FThreadClass write FThreadClass;
    property Threads: TThreadList read FThreads;
	published
    property AcceptWait: integer read FAcceptWait write SetAcceptWait default 1000;
    property Active: boolean read FActive write SetActive;
    property Bindings: TPsiSocketHandles read FBindings write SetBindings;
    property DefaultPort: integer read GetDefaultPort write SetDefaultPort;
    property Intercept: TPsiServerIntercept read FIntercept write SetIntercept;
		// Occurs in the context of the peer thread
    property OnConnect: TPsiServerThreadEvent read FOnConnect write FOnConnect;
		// Occurs in the context of the peer thread
    property OnExecute: TPsiServerThreadEvent read FOnExecute write FOnExecute;
		// Occurs in the context of the peer thread
    property OnDisconnect: TPsiServerThreadEvent read FOnDisconnect write FOnDisconnect;
    property TerminateWaitTime: Integer read FTerminateWaitTime write FTerminateWaitTime
     default 5000;
    property ThreadMgr: TPsiThreadMgr read FThreadMgr write SetThreadMgr;
  end;

implementation

uses
  PsiException, PsiGlobal, PsiResourceStrings, PsiStack, PsiStackConsts;

{ TPsiTCPServer }

constructor TPsiTCPServer.Create(AOwner: TComponent);
begin
  inherited;
  FAcceptWait := 1000;
  FTerminateWaitTime := 5000;
  FThreads := TThreadList.Create;
  FBindings := TPsiSocketHandles.Create(Self);
  FThreadClass := TPsiPeerThread;
//  fSessionTimer := TTimer.Create(self);
end;

destructor TPsiTCPServer.Destroy;
begin
  Active := False;
  TerminateAllThreads;
  FreeAndNil(FBindings);
  FreeAndNil(FThreads);
  inherited;
end;

procedure TPsiTCPServer.DoConnect(axThread: TPsiPeerThread);
begin
  if assigned(OnConnect) then begin
    OnConnect(axThread);
  end;
end;

procedure TPsiTCPServer.DoDisconnect(axThread: TPsiPeerThread);
begin
  if assigned(OnDisconnect) then begin
    OnDisconnect(axThread);
  end;
end;

function TPsiTCPServer.DoExecute(AThread: TPsiPeerThread): boolean;
begin
  result := assigned(OnExecute);
  if result then begin
    OnExecute(AThread);
  end;
end;

function TPsiTCPServer.GetDefaultPort: integer;
begin
  result := FBindings.DefaultPort;
end;

procedure TPsiTCPServer.Loaded;
begin
  inherited;
  // Active=True must not be performed before all other props are loaded
  if Active then begin
    FActive := False;
    Active := True;
  end;
end;

procedure TPsiTCPServer.Notification(AComponent: TComponent; Operation: TOperation);
begin
  inherited;
  // remove the reference to the linked components if they are deleted
  if (Operation = opRemove) then begin
    if (AComponent = FThreadMgr) then begin
      FThreadMgr := nil;
    end else if (AComponent = FIntercept) then begin
      FIntercept := nil;
    end;
  end;
end;

procedure TPsiTCPServer.SetAcceptWait(AValue: integer);
begin
  if Active then begin
    raise EPsiException.Create(RSAcceptWaitCannotBeModifiedWhileServerIsActive);
  end;
  FAcceptWait := AValue;
end;

procedure TPsiTCPServer.SetActive(AValue: Boolean);
var
  i: Integer;
begin
  if (not (csDesigning in ComponentState)) and (FActive <> AValue)
   and (not (csLoading in ComponentState)) then begin
    if AValue then begin
      // Set up bindings
      if Bindings.Count < 1 then
        if DefaultPort > 0 then begin
          Bindings.Add;
        end else begin
          raise EPsiException.Create(RSNoBindingsSpecified);
        end;

      for i := 0 to Bindings.Count - 1 do
      begin
        Bindings[i].AllocateSocket;
        Bindings[i].Bind;
        Bindings[i].Listen;
      end;

      // Set up ThreadMgr
      FImplicitThreadMgrCreated := not assigned(ThreadMgr);
      if ImplicitThreadMgrCreated then begin
        ThreadMgr := TPsiThreadMgrDefault.Create(Self);
      end;
      ThreadMgr.ThreadClass := ThreadClass;

      // Set up listener thread
      FListenerThread := TPsiListenerThread.Create(Self);
      FListenerThread.AcceptWait := AcceptWait;
      FListenerThread.Start;
    end else begin
      // Stop listening
      for i := 0 to Bindings.Count - 1 do begin
        Bindings[i].CloseSocket;
      end;
      // Tear down ThreadMgr
      TerminateAllThreads;
      if ImplicitThreadMgrCreated then begin
        FreeAndNil(FThreadMgr);
      end;
      FImplicitThreadMgrCreated := false;

      // Tear down Listener thread
      FListenerThread.TerminateAndWaitFor;
      FListenerThread.Free;
    end;
  end;
  FActive := AValue;
end;

procedure TPsiTCPServer.SetBindings(const abValue: TPsiSocketHandles);
begin
	FBindings.Assign(abValue);
end;

procedure TPsiTCPServer.SetDefaultPort(const AValue: integer);
begin
  FBindings.DefaultPort := AValue;
end;

procedure TPsiTCPServer.SetIntercept(const Value: TPsiServerIntercept);
begin
  FIntercept := Value;
  // Add self to the intercept's notification list
  if assigned(FIntercept) then
  begin
    FIntercept.FreeNotification(Self);
  end;
end;

procedure TPsiTCPServer.SetThreadMgr(const Value: TPsiThreadMgr);
begin
  FThreadMgr := Value;
  // Ensure we will be notified when the component is freed, even is it's on
  // another form
  if Value <> nil then begin
    Value.FreeNotification(self);
  end;
end;

procedure TPsiTCPServer.TerminateAllThreads;
var
  i: integer;
  list: TList;
  Thread: TPsiPeerThread;
const
  LSleepTime: integer = 250;
begin
  list := Threads.LockList; try
    for i := 0 to list.Count - 1 do begin
      Thread := TPsiPeerThread(list[i]);
      Thread.Connection.DisconnectSocket;
    end;
  finally Threads.UnlockList; end;
  // Must wait for all threads to terminate, as they access the server and bindings. If this
  // routine is being called from the destructor, this can cause AVs/
  //
  // This method is used instead of:
  //  -Threads.WaitFor. Since they are being destroyed thread.Wait for could AV. And Waiting for
  //   Handle produces different code for different OSs, and using common code has troubles
  //   as the handles are quite different.
  //  -Last thread signaling
  for i := 1 to (TerminateWaitTime div LSleepTime) do begin
    Sleep(LSleepTime);
    list := Threads.LockList; try
      if list.Count = 0 then begin
        break;
      end;
    finally Threads.UnlockList; end;
  end;
end;

{ TPsiListenerThread }

procedure TPsiListenerThread.AfterRun;
var
  i: Integer;
begin
  inherited;
  for i := Server.Bindings.Count - 1 downto 0 do begin
    Server.Bindings[i].CloseSocket;
  end;
end;

constructor TPsiListenerThread.Create(axServer: TPsiTCPServer);
begin
  inherited Create;
  FBindingList := TList.Create;
	FServer := axServer;
end;

destructor TPsiListenerThread.Destroy;
begin
  FBindinglist.Free;
  inherited;
end;

procedure TPsiListenerThread.Run;
var
  PeerIP: string;
  PeerPort: integer;
  Peer: TPsiTCPServerConnection;
  Thread: TPsiPeerThread;
  i : Integer;
begin
  FBindingList.Clear;
  for i:= 0 to Server.Bindings.Count - 1 do begin
    FBindingList.Add(TObject(Server.Bindings[i].Handle));
  end;
  if GStack.WSSelect(FBindingList, nil, nil, AcceptWait) > 0 then begin
    if not Terminated then begin
      for i := 0 to FBindingList.Count - 1 do begin
        Peer := TPsiTCPServerConnection.Create(Server);
        with Peer do begin
          Binding.Accept(Server.Bindings[i].Handle, PeerIP, PeerPort);
          Binding.IP := Server.Bindings[i].IP;
          Binding.Port := Server.Bindings[i].Port;
          Binding.SetPeer(PeerIP, PeerPort);
          // LastRcvTimeStamp := Now;  // Added for session timeout support
          // ProcessingTimeout := False;
          if assigned(Server.Intercept) then begin
            try
              Peer.Intercept := Server.Intercept.Accept(Binding);
            except
              FreeAndNil(Peer);
            end;
          end;
        end;
        if Peer <> nil then begin
          // Create Thread
          if Server.ThreadMgr <> nil then
          begin
            Thread := TPsiPeerThread(Server.ThreadMgr.GetThread);
            Thread.FConnection := Peer;
            Server.Threads.Add(Thread);
            Thread.Start;
          end;
        end;
      end;
    end;
  end;
end;

{ TPsiTCPServerConnection }

function TPsiTCPServerConnection.GetServer: TPsiTCPServer;
begin
  result := Owner as TPsiTCPServer;
end;

{ TPsiPeerThread }

procedure TPsiPeerThread.AfterRun;
begin
  // Other things are done in destructor
  with Connection.Server do begin
    DoDisconnect(Self);
    ThreadMgr.ReleaseThread(Self);
    Threads.Remove(Self);
  end;
end;

procedure TPsiPeerThread.BeforeRun;
begin
  Connection.Server.DoConnect(Self);
end;

destructor TPsiPeerThread.Destroy;
begin
  Connection.Free;
  inherited;
end;

procedure TPsiPeerThread.Run;
begin
  try
    if not Connection.Server.DoExecute(Self) then begin
      raise EPsiException.Create(RSNoExecuteSpecified);
    end;
  except
    on E: EPsiSocketError do begin
      case E.LastError of
        Psi_WSAECONNABORTED, // WSAECONNABORTED - Other side disconnected
        Psi_WSAECONNRESET:
         Connection.Disconnect;
      end;
    end;
    on EPsiClosedSocket do ;
  else
    raise;
  end;
  // If connection lost, stop thread
  try
    if Connection <> nil then
      if not Connection.Connected then
      begin
        Stop;
      end;
  except
  end;
end;

end.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -