📄 dnthreadexecutor.pas
字号:
try
FGuard.Acquire;
if FEvent.Count >= FMaxQueueSize then
Result := False
else if FBusyCount >= FMaxThread then
begin
if not FRefuseOverload then
begin
FEvent.Add(Event);
FNonEmpty.Release;
Result := True;
end else
Result := False; //refuse request
end else if FBusyCount >= FWorker.Count then
begin
//Launch new thread
LaunchThread;
FEvent.Add(Event);
FNonEmpty.Release;
Result := True;
end else
begin
FEvent.Add(Event);
FNonEmpty.Release;
Result := True;
end;
finally
FGuard.Release;
Event := Nil;
end;
end;
function TDnThreadExecutor.GetEvent(Thread: TDnWorkerThread;
var Event: IDnIOResponse): Boolean;
var isEvents: Boolean;
begin
Result := False;
if FStopping then
begin
Event := Nil;
Result := True;
Exit;
end;
try
FGuard.Acquire;
isEvents := FEvent.Count <> 0;
if not isEvents then
begin
if (FWorker.Count > FMinThread) and (Thread.FRefCount = 0) then
begin
Event := Nil;
Result := True;
FGuard.Release;
Exit;
end;
end;
FGuard.Release;
if FNonEmpty.Wait(100) = dwrSignaled then
begin
FGuard.Acquire;
if FEvent.Count > 0 then
begin
Event := FEvent[0] as IDnIOResponse;
FEvent.Delete(0);
end else
Event := Nil; //signal to exit
FGuard.Release;
Result := True;
end;
finally
;
end;
end;
procedure TDnThreadExecutor.SetMinThread(Value: Integer);
begin
if (Value > FMaxThread) or (Value < 0)then
raise EDnException.Create(ErrInvalidParameter, 0);
FGuard.Acquire;
FMinThread := Value;
FGuard.Release;
end;
procedure TDnThreadExecutor.SetMaxThread(Value: Integer);
begin
if (Value < FMinThread) or (Value < 0) then
raise EDnException.Create(ErrInvalidParameter, 0);
//reading from aligned variable is granular relating to threading issues
FMaxThread := Value;
end;
function TDnThreadExecutor.GetQueueSize: Integer;
begin
Result := FEvent.Count;
end;
function TDnThreadExecutor.GetBusyCount: Integer;
begin
Result := FBusyCount;
end;
procedure TDnThreadExecutor.Lock;
begin
FGuard.Acquire;
end;
procedure TDnThreadExecutor.Unlock;
begin
FGuard.Release;
end;
procedure TDnThreadExecutor.BindChannelToContext(Channel: IDnIOTrackerHolder; Context: TDnThreadContext);
begin
if Channel <> Nil then
begin
Channel.Bind(Context.OwnerThread);
TDnWorkerThread(Context.OwnerThread).Bind(Channel);
end;
end;
procedure TDnThreadExecutor.UnbindChannelFromContext(Channel: IDnIOTrackerHolder; Context: TDnThreadContext);
begin
if Channel <> Nil then
begin
Channel.Unbind(Context.OwnerThread);
TDnWorkerThread(Context.OwnerThread).Unbind(Channel);
end;
end;
constructor TDnWorkerThread.Create( Pool: TDnThreadExecutor );
begin
inherited Create;
FBusy := False;
FPool := Pool;
FBoundCount := 0;
FEvent := Nil;
FGuard := TDnMutex.Create;
FNonEmpty := TDnSemaphore.Create(0, $7FFFFFFF);
end;
destructor TDnWorkerThread.Destroy;
begin
FreeAndNil(FGuard);
FreeAndNil(FNonEmpty);
inherited Destroy;
end;
function TDnWorkerThread.DoExit: Boolean;
var AllowExit: Boolean;
begin
AllowExit := True;
if Assigned(FPool.OnThreadRemove) then
AllowExit := FPool.OnThreadRemove(FPool, Self, FContext);
Result := AllowExit;
FPool.RemoveThread(Self);
end;
procedure TDnWorkerThread.CreateContext;
begin
if FContext = Nil then
begin
if Assigned(FPool.FOnCreateContext) then
FContext := FPool.FOnCreateContext(Self)
else
FContext := TDnThreadContext.Create(Self);
end;
end;
procedure TDnWorkerThread.DestroyContext;
begin
if (FContext <> Nil) and Assigned(FPool.FOnDestroyContext) then
FPool.FOnDestroyContext(FContext)
else
FContext.Free;
FContext := Nil;
end;
procedure TDnWorkerThread.ThreadRoutine;
var Event: IDnIOResponse;
begin
while True do
try
Event := Nil;
//Fetch new IOResponse
if FBoundCount = 0 then
begin // get event from shared event's queue
while not Terminated and not FPool.GetEvent(Self, Event) do
;
end else //get event from private event's queue
while not Terminated and not GetEvent(Event) do
;
//Maybe signal to exit?
if Terminated or not Assigned(Event) then
begin //signal to terminate
if Self.DoExit then
exit
else
continue;
end;
if Assigned(Event) then
begin
if FBoundCount = 0 then
begin
FPool.IncrementBusy;
FBusy := True;
end;
try
Event.CallHandler(FContext);
except
on E: Exception do
FPool.DoLogEvent(E.Message);
end;
InterlockedDecrement(PendingRequests);
if FBoundCount = 0 then
begin
FBusy := False;
FPool.DecrementBusy;
end;
end;
except
on E: Exception do
FPool.DoLogEvent(E.Message);
end;
Event := Nil;
end;
function TDnWorkerThread.PostEvent(Event: IDnIOResponse): Boolean;
begin
Result := True;
FGuard.Acquire;
FEvent.Add(Event); //FSource.Add(Source);
FNonEmpty.Release;
FGuard.Release;
end;
function TDnWorkerThread.GetEvent(var Event: IDnIOResponse): Boolean;
begin
Result := False;
try
if FNonEmpty.Wait(100) = dwrSignaled then
begin
FGuard.Acquire;
if FEvent.Count > 0 then
begin
Event := FEvent[0] as IDnIOResponse;
FEvent.Delete(0);
end else
Event := Nil;
FGuard.Release;
Result := True;
end;
finally
;//FGuard.Release;
end;
end;
procedure TDnWorkerThread.Lock;
begin
if FGuard <> Nil then
FGuard.Acquire;
end;
procedure TDnWorkerThread.Unlock;
begin
if FGuard <> Nil then
FGuard.Release;
end;
procedure TDnWorkerThread.Bind(Source: IDnIOTrackerHolder);
begin
FEvent := TInterfaceList.Create;
Inc(FBoundCount);
end;
procedure TDnWorkerThread.Unbind(Source: IDnIOTrackerHolder);
begin
FreeAndNil(FEvent);
Dec(FBoundCount);
end;
procedure Register;
begin
{$IFDEF ROOTISCOMPONENT}
RegisterComponents('DNet', [TDnThreadExecutor]);
{$ENDIF}
end;
end.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -