⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iocppool.~pas

📁 delphi下的IOCP模型。可以下载来试试。
💻 ~PAS
字号:
{*

Synapse IOCPPool implementation.

The TTCPDaemon class works like a web-server using a pool of TWorkerThreads waiting
for an IOCP handle. It reads a .config file to configurate the class.

The free XML parser component (no source code) can be found in
http://www.icom-dv.de

These unit was inspired on the William Kennedy article at Continuum Technology
"IOCP Thread Pooling in C#" - http://www.theukwebdesigncompany.com/articles/article.php?id=82

Feel fre to use it and change it.
Any comments or improvements please tell me ! :)

Synapse rocks !

--
Andre Azevedo
la.andre@terra.com.br

*}

unit IOCPPool;

interface

uses
	ComObj, ActiveX, AdoDb, Windows, SyncObjs, Classes,
	BlckSock, WinSock, SysUtils, IcXMLParser;

type

	TUpdateEvent = procedure(nActThreads, nCurWorks, nCurThreads: integer) of object;
	TProcessEvent = procedure(ASock: TTCPBlockSocket; ADBConnection: TAdoConnection) of object;
	TErrorEvent = procedure(AE: Exception) of object;

	TTcpDaemon = class;

	TWorkerThread = class(TThread)
    private
		FDBConnection: TAdoConnection;
        FConnectionString: string;
    	FTcpDaemon: TTCPDaemon;
		FWorkerSocket: TTCPBlockSocket;
	protected
		procedure Execute; override;
	public
		constructor Create(ATcpDaemon: TTCPDaemon; AConnectionString: string);
        destructor Destroy; override;
	end;

	TAcceptThread = class(TThread)
    private
    	FTcpDaemon: TTCPDaemon;
    	FPort: integer;
		FSock: TTCPBlockSocket;
	protected
		procedure Execute; override;
	public
		constructor Create(ATcpDaemon: TTcpDaemon; APort: integer);
		destructor Destroy; override;
	end;

	TTCPDaemon = class(TObject)
    private
		FAcceptthread: TAcceptThread;
		FPort: integer;
		FIOCPHandle: THandle;
        FMaxThreadsInPool: integer;
        FMinThreadsInPool: integer;
		FCurThreadsInPool: integer;
		FActThreadsInPool: integer;
		FIdleTimeOut: integer;
		FCurWorksInPool: integer;
		FCRTSection: TCriticalSection;
		FTerminateEvent: TSimpleEvent;
		FServiceTerminateEvent: TSimpleEvent;
        FWorkerThreads: TList;
    	FOnUpdateEvent: TUpdateEvent;
		FOnProcessEvent: TProcessEvent;
		FOnErrorEvent: TErrorEvent;
		FErrorException: Exception;
		FConnectionString: string;
		FConnectionTimeOut: integer;
		FCommandTimeOut: integer;
		FActive: boolean;
		procedure IncActThreadsInPool;
		procedure DecActThreadsInPool;
		procedure IncCurThreadsInPool;
		procedure DecCurThreadsInPool;
		procedure IncCurWorksInPool;
		procedure DecCurWorksInPool;
		procedure SyncAtualiza;
		procedure SyncErros;
		procedure Config;
	public
		constructor Create;
		destructor Destroy; override;
		property Active: boolean read FActive write FActive;
		property Port: integer read FPort write FPort;
		property MaxThreadsInPool: integer read FMaxThreadsInPool write FMaxThreadsInPool;
		property MinThreadsInPool: integer read FMinThreadsInPool write FMinThreadsInPool;
		property IdleTimeOut: integer read FIdleTimeOut write FIdleTimeOut;
		property ConnectionString: string read FConnectionString write FConnectionString;
		property ConnectionTimeOut: integer read FConnectionTimeOut write FConnectionTimeOut;
		property CommandTimeOut: integer read FCommandTimeOut write FCommandTimeOut;
		property OnUpdate: TUpdateEvent read FOnUpdateEvent write FOnUpdateEvent;
		property OnProcess: TProcessEvent read FOnProcessEvent write FOnProcessEvent;
		property OnError: TErrorEvent read FOnErrorEvent write FOnErrorEvent;
		procedure Start;
		procedure Stop;
		procedure WaitForTerminate;
	end;

implementation

{ TTCPDaemon }
const
	KILL_THREAD = $7fffffff;

procedure TTCPDaemon.DecActThreadsInPool;
begin
	InterlockedDecrement(FActThreadsInPool);
end;

procedure TTCPDaemon.DecCurWorksInPool;
begin
	InterlockedDecrement(FCurWorksInPool);
end;

procedure TTCPDaemon.DecCurThreadsInPool;
begin
	InterlockedDecrement(FCurThreadsInPool);
end;

destructor TTCPDaemon.Destroy;
begin

FWorkerThreads.Free;
FCRTSection.Free;
FTerminateEvent.Free;
FServiceTerminateEvent.Free;

inherited;

end;

procedure TTCPDaemon.IncActThreadsInPool;
begin
	InterlockedIncrement(FActThreadsInPool);
end;

procedure TTCPDaemon.IncCurWorksInPool;
begin
	InterlockedIncrement(FCurWorksInPool);
end;

procedure TTCPDaemon.IncCurThreadsInPool;
begin
	InterlockedIncrement(FCurThreadsInPool);
end;

procedure TTCPDaemon.SyncAtualiza;
begin

if Assigned(OnUpdate) then
    OnUpdate(FActThreadsInPool, FCurWorksInPool, FCurThreadsInPool);

end;

procedure TTCPDaemon.Start;
var
	nContador: integer;
	Thread: TWorkerThread;
	SystemInfo: TSystemInfo;

begin

try

	Config;
	GetSystemInfo(SystemInfo);
	FIOCPHandle := 	CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, SystemInfo.dwNumberOfProcessors);

	if (FIOCPHandle = 0) then
		raise Exception.Create('Erro IOCP Creation');

	for nContador := 0 to FMinThreadsInPool - 1 do begin
		Thread := TWorkerThread.Create(Self, FConnectionString);
		FWorkerThreads.Add(Thread);
		IncCurThreadsInPool;
	end;

	FAcceptthread := TAcceptThread.Create(Self, FPort);

	Active := True;

except

	Active := False;
	raise;

end;

end;

procedure TTCPDaemon.Stop;
begin

try

	FAcceptThread.FSock.CloseSocket;
	FAcceptthread.Terminate;
	FAcceptthread.WaitFor;

	CloseHandle(FIOCPHandle);

finally

	Active := False;
	FServiceTerminateEvent.SetEvent;

end;

end;

constructor TTCPDaemon.Create;
begin

try

	FWorkerThreads := TList.Create;
	FCRTSection := TCriticalSection.Create;
	FTerminateEvent := TSimpleEvent.Create;
	FServiceTerminateEvent := TSimpleEvent.Create;

    FConnectionString := '';
	FConnectionTimeout := 0;
    FCommandTimeOut := 0;
    FIdleTimeOut := 0;
    FMinThreadsInPool := 0;
    FMaxThreadsInPool := 0;

	inherited;

except

	raise

end;

end;

procedure TTCPDaemon.SyncErros;
begin

if Assigned(FOnErrorEvent) then
	FOnErrorEvent(FErrorException);

end;

procedure TTCPDaemon.WaitForTerminate;
begin

FServiceTerminateEvent.WaitFor(INFINITE);

end;

procedure TTCPDaemon.Config;
var
	elRoot, elElement: TIcXMLElement;
	elNode: TIcXMLNode;
	doc: TIcXMLDocument;
	IcXMLParser: TIcXMLParser;
	stmXML: TMemoryStream;

begin

stmXML := TMemoryStream.Create;
doc := TIcXMLDocument.Create;

IcXMLParser := TIcXMLParser.Create(nil);

doc.AssignParser(IcXMLParser);
doc.setEncoding('UTF-8');

stmXML.Clear;
stmXML.LoadFromFile(ExtractFilePath(ParamStr(0)) + ExtractFileName(ParamStr(0)) +  '.config');

IcXMLParser.Parse(stmXML, doc);

stmXML.Free;

//----- Root !
elRoot := doc.GetDocumentElement;

if elRoot.GetName = 'Configuration' then begin

	//----- Configuration !
	if elRoot.hasChild then begin

        elElement := elRoot.GetFirstChild;

		while Assigned(elElement) do begin

			//----- DbConnection !
			if elElement.GetName = 'DbConnection' then begin

				elNode := elElement.GetAttributeNode('ConnectionString');
				if Assigned(elNode) then
	                FConnectionString := elNode.GetValue;

				elNode := elElement.GetAttributeNode('ConnectionTimeOut');
				if Assigned(elNode) then
	                FConnectionTimeOut := StrToInt(elNode.GetValue);

				elNode := elElement.GetAttributeNode('CommandTimeOut');
				if Assigned(elNode) then
	                FCommandTimeOut := StrToInt(elNode.GetValue);

			end;

			//----- SocketConnection !
			if elElement.GetName = 'SocketConnection' then begin

				elNode := elElement.GetAttributeNode('Port');
				if Assigned(elNode) then
	                FPort := StrToInt(elNode.GetValue);

				elNode := elElement.GetAttributeNode('MinThreads');
				if Assigned(elNode) then
					FMinThreadsInPool := StrToInt(elNode.GetValue);

				elNode := elElement.GetAttributeNode('MaxThreads');
				if Assigned(elNode) then
                    FMaxThreadsInPool := StrToInt(elNode.GetValue);

				elNode := elElement.GetAttributeNode('IdleTimeOut');
				if Assigned(elNode) then
                    FIdleTimeOut := StrToInt(elNode.GetValue) * 1000;

			end;

            elElement := elElement.NextSibling;

		end;

	end;

end;

IcXMLParser.Free;
doc.Free;

end;

{ TAcceptThread }

constructor TAcceptThread.Create;
begin

FTcpDaemon := ATcpDaemon;
FPort := APort;
FSock := TTCPBlockSocket.Create;
FreeOnTerminate := True;

inherited Create(False);

end;

destructor TAcceptThread.Destroy;
begin

FSock.Free;
inherited;

end;

procedure TAcceptThread.Execute;
var
	ClientSocket: TSocket;
	nContador: integer;

begin

with FSock do begin

	CreateSocket;

	setLinger(true, 10000);

	bind('0.0.0.0', IntToStr(FPort));
	listen;

    FTcpDaemon.SyncAtualiza;

	repeat

		if CanRead(5000) then begin

			ClientSocket := Accept;

			if ClientSocket <> -1 then begin
				FTcpDaemon.IncCurWorksInPool;
				PostQueuedCompletionStatus(FTcpDaemon.FIOCPHandle, 0, Cardinal(ClientSocket), nil);
			end;

		end

	until Terminated;

	CloseSocket;

	with FTcpDaemon do begin

		FCRTSection.Enter;

		try

			for nContador := FWorkerThreads.Count - 1 downto 0 do
				PostQueuedCompletionStatus(FIOCPHandle, 0, Cardinal(KILL_THREAD), nil);

		finally

			FCRTSection.Leave;

		end;

		WaitForSingleObject(FTerminateEvent.Handle, INFINITE);

	end;

end;

end;

{ TWorkerThread }

constructor TWorkerThread.Create(ATcpDaemon: TTCPDaemon; AConnectionString: string);
begin

FTcpDaemon := ATcpDaemon;
FConnectionString := AConnectionString;

FreeOnTerminate := True;
inherited Create(False);

end;

destructor TWorkerThread.Destroy;
begin
	inherited;
end;

procedure TWorkerThread.Execute;
var
	Ov: POverlapped;
	Transfered: dword;
	ClientSocket: TSocket;
	Thread: TThread;
	nContador: integer;

begin

with FTcpDaemon do begin

		if Length(FConnectionString) > 0 then begin

			FDBConnection := TAdoConnection.Create(nil);

		    FDBConnection.LoginPrompt := False;
			FDBConnection.ConnectionTimeout := FConnectionTimeOut;
			FDBConnection.CommandTimeout := FCommandTimeOut;
		    FDBConnection.ConnectionString := FConnectionString;

			FDBConnection.Open;
            FDBConnection.Execute('set quoted_identifier off');

		end;

		while not Terminated do begin

			//----- Espera pelo evento !
			if not GetQueuedCompletionStatus(FIOCPHandle, Transfered, Cardinal(ClientSocket), Ov, FIdleTimeOut + 500) then begin

				//----- Verifica se o tempo expirou !
				if GetLastError = WAIT_TIMEOUT then begin

					FCRTSection.Enter;

					try

						if (FCurThreadsInPool > FMinThreadsInPool) then
							Terminate;

					finally

	                    FCRTSection.Leave;

					end;

    			end else begin

					Terminate;
					Break;

				end;

			end else begin

				//----- Verifica se o servidor esta sendo destruido !
				if (ClientSocket = KILL_THREAD) then begin
					Terminate;
					Break;
				end;

				//----- Atualiza Threads!
				IncActThreadsInPool;
				DecCurWorksInPool;
				Synchronize(SyncAtualiza);

				FCRTSection.Enter;

				try

					if (FTcpDaemon.FCurThreadsInPool < FTcpDaemon.FMaxThreadsInPool) then begin

						if (FTcpDaemon.FActThreadsInPool = FTcpDaemon.FCurThreadsInPool) then begin

							Thread := TWorkerThread.Create(FTcpDaemon, FTcpDaemon.ConnectionString);
                            FWorkerThreads.Add(Thread);
					        IncCurThreadsInPool;
							Synchronize(SyncAtualiza);

						end;

					end;

				finally

	                FCRTSection.Leave;

				end;

	            FWorkerSocket := TTCPBlockSocket.Create;

				with FWorkerSocket do begin

					//----- Processa Mensagens !
					try

						//--- Maldito try/except/finally !
						try

							Socket := ClientSocket;
		                    GetSins;
							setLinger(true, 10000);

							if Assigned(FOnProcessEvent) then
	                            FOnProcessEvent(FWorkerSocket,  FDBConnection);

						except

							on e: Exception do begin

								FCRTSection.Enter;

								try
									FErrorException := e;
								finally
									FCRTSection.Leave;
								end;

							    Synchronize(SyncErros);

							end;

						end;

					finally

						DecActThreadsInPool;
			        	CloseSocket;

                    end;

				end;

	            FreeAndNil(FWorkerSocket);
				Synchronize(SyncAtualiza);

			end; //if

		end; //while

		if Assigned(FDBConnection) then begin
			FDBConnection.Close;
			FDBConnection.Free;
		end;

		FCRTSection.Enter;

		try

			nContador := FWorkerThreads.IndexOf(Self);

			if nContador >= 0 then
				FWorkerThreads.Delete(nContador);

			DecCurThreadsInPool;
			Synchronize(SyncAtualiza);

			if FCurThreadsInPool = 0 then
               FTerminateEvent.SetEvent;

		finally

		    FCRTSection.Leave;

		end;


end; //with

end;

end.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -