📄 threadtimer.pas
字号:
begin
l := 1;
isp := false;
end;
vtExtended : l := sizeof(Extended);
vtString : l := byte(VString^[0])+1;
vtPointer : Flg := True;
vtPChar : l := strlen(VPChar);
vtWideChar :
begin
l := 2;
isp := false;
end;
vtAnsiString : l := length(string(VAnsiString));
vtCurrency : l := sizeof(Currency);
vtVariant : l := length(string(VVariant^));
vtInterface : l := length(string(VInterface^));
vtInt64 : l := sizeof(Int64);
else
l := 0;
end;
if not flg and (l > 0) then
begin
if isp then
move(VPointer^, result[len+1], l)
else
move(VInteger, result[len+1], l);
inc(len, l);
end;
end;
if Flg then
begin
l := length(string(Bufs[high(bufs)].VPointer)) and $FFFFFFFC - 4;
move(Bufs[high(bufs)].VPointer^, result[Len+1], l);
end;
end;
{ TTimerWnd }
constructor TTimerWnd.Create;
begin
Wnd := allocatehwnd(wndproc);
Thrd := GetCurrentThreadId;
end;
destructor TTimerWnd.Destroy;
begin
deallocatehwnd(wnd);
inherited;
end;
procedure TTimerWnd.AddMsgProc(Proc: TWndMethod);
begin
if getcurrentthreadid <> Thrd then
postmessage(wnd, WSM_SET_FUNC, Integer(TMethod(proc).Code), integer(TMethod(proc).Data))
else begin
if qcount= length(msgqueue) then
setlength(msgqueue, qcount+4);
msgqueue[qcount] := Proc;
inc(qcount);
end;
end;
procedure TTimerWnd.RemoveMsgProc(Proc: TWndMethod);
var
i: Integer;
begin
if getcurrentthreadid <> thrd then
postmessage(wnd, WSM_DEL_FUNC, Integer(TMethod(proc).code), Integer(TMethod(proc).Data))
else begin
for i := 0 to qcount-1 do
with TMethod(msgqueue[i]) do
if (code = TMethod(proc).Code) and (data=TMethod(proc).Data) then
begin
dec(qcount);
if i < qcount then
move(msgqueue[i+1], msgqueue[i], (qcount-i)*8);
break;
end;
end;
end;
procedure TTimerWnd.WndProc(var Message: TMessage);
var
i: Integer;
//v: Cardinal;
begin
try
case Message.Msg of
WSM_TIMER:
begin
message.Result := 0;
processtimer(PTimerInfo(message.LParam));
end;
WSM_USER_FUNC:
begin
message.Result := 0;
{asm
PUSH EBX
MOV EDX, Message.LParam
PUSH EDX
MOV EAX, ESP
MOV V, EAX
MOV EAX, EDX
MOV EBX, Message.WParam
CALL EBX
MOV EAX, ESP
CMP EAX, V
JNZ @@1
POP EDX
@@1:
POP EBX
end;}
TThrdCallback(Message.WParam)(pointer(message.LParam));
end;
WSM_SET_FUNC:
begin
message.Result := 0;
for i := 0 to QCount-1 do
if (TMethod(MsgQueue[i]).Code = pointer(message.WParam)) and
(TMethod(MsgQueue[i]).Data = pointer(message.LParam)) then
exit;
if QCount=length(msgqueue) then
setlength(msgqueue, qcount+1);
TMethod(msgqueue[qcount]).Code := pointer(message.WParam);
TMethod(msgqueue[qcount]).Data := pointer(message.LParam);
inc(qcount);
end;
WSM_DEL_FUNC:
begin
message.Result := 0;
for i := 0 to qcount-1 do
with TMethod(MsgQueue[i]) do
if (integer(code) = message.WParam) and (integer(data) = message.LParam) then
begin
dec(qcount);
if i < qcount then
move(msgqueue[i+1], msgqueue[i], (qcount-i)*sizeof(TMethod));
break;
end;
end;
else
if qcount>0 then
begin
message.Result := 0;
for i := 0 to qcount-1 do
try
msgqueue[i](message);
if message.Result <> 0 then
break;
except
end;
end;
end;
except
{$IFDEF DEBUGMSG}
on e: exception do
LogDbgMsg('TimerWnd.WndProc error: '+e.Message);
{$ENDIF}
end;
end;
{ TThreadTimer }
constructor TThreadTimer.Create;
begin
WaitEvent:=CreateEvent(nil, false, false, nil);
jobs := TFIFOBuffer.Create(MAX_THREADJOB*4);
//initializecriticalsection(lock);
inherited Create(false);
end;
function TThreadTimer.AddJob(WParam, LParam: Integer; Interval: Cardinal;
CallBack: TThreadTimerCallbackProc; Sender: Pointer;
RunOnce: Boolean; RunInThread: Boolean = false): Cardinal;
begin
result := addjobanyway(WParam, LParam, interval, @callback, sender, runonce, runinthread);
end;
function TThreadTimer.AddJob(WParam, LParam: Integer; Interval: Cardinal;
CallBack: TThreadTimerCallback; RunOnce: Boolean;
RunInThread: Boolean = false): Cardinal;
begin
result := addjobanyway(WParam, LParam, interval, TMethod(callback).Code, TMethod(callback).Data,
runOnce, RunInThread);
end;
procedure TThreadTimer.deleteJob(Handle: Cardinal);
begin
try
with PTimerInfo(Handle)^ do
if assigned(callback) then
begin
callback := nil;
end;
except
{$IFDEF DEBUGMSG}
on e: exception do
LogDbgMsg('ThreadTimer.DeleteJob error: '+e.Message);
{$ENDIF}
end;
end;
procedure TThreadTimer.Execute;
var
Tick: Cardinal;
v, n: Cardinal;
p: PTimerInfo;
begin
freeonterminate := true;
n := INFINITE;
while not terminated do
try
tick := gettickcount;
v := waitforsingleobjectex(waitevent, n, true);
if terminated then break;
if v <> WAIT_OBJECT_0 then
begin
if n <> INFINITE then
begin
v := gettickcount-tick;
if n > v then
begin
dec(n, v);
continue;
end;
end
else
continue;
end;
n := INFINITE;
jobs.SetEndPort;
while jobs.Pop(cardinal(p)) do
try
if assigned(p^.callback) then
begin
if (gettickcount>=p^.NextTick) then
begin
inc(p^.NextTick, p^.Interval);
if p^.Interval > 0 then
begin
if n > p^.NextTick then n := p^.NextTick;
jobs.Push(cardinal(p));
end;
interlockedincrement(p^.Ref);
if p^.RunInThrd then
GlobalThreadPool.postjob(@processtimer, cardinal(p))
else
postmessage(timerwnd.Wnd, WSM_TIMER, Integer(self), Integer(p));
end
else begin
if n > p^.NextTick then
n := p^.NextTick;
jobs.Push(cardinal(p));
end;
end
else deallocinfo(p);
except
{$IFDEF DEBUGMSG}
on e: exception do
LogDbgMsg('ThreadTimer.Execute error: '+e.Message);
{$ENDIF}
end;
if n<gettickcount then
n := 0
else if n <> INFINITE then
n := n - gettickcount;
except
{$IFDEF DEBUGMSG}
on e: exception do
LogDbgMsg('ThreadTimer.Execute error: '+e.Message);
{$ENDIF}
end;
closehandle(waitevent);
while jobs.Pop(cardinal(p)) do
deallocinfo(p);
jobs.Free;
end;
procedure TThreadTimer.Stop;
begin
try
terminate;
setevent(waitevent);
except
{$IFDEF DEBUGMSG}
on e: exception do
LogDbgMsg('ThreadTimer.Stop error: '+e.Message);
{$ENDIF}
end;
end;
function TThreadTimer.AddJobAnyway(WParam, LParam: Integer; Interval: Cardinal;
CallBack, Sender: Pointer; RunOnce, RunInThread: Boolean): Cardinal;
var
p: PTimerInfo;
begin
p := allocinfo;
if runonce then
p^.Interval := 0
else
p^.Interval := Interval;
p^.NextTick := gettickcount+interval;
p^.Param := Wparam;
p^.Param2 := LParam;
p^.Sender := Sender;
p^.CallBack := callback;
p^.RunInThrd := RunInThread;
p^.Ref := 0;
if (Interval = 0) then
begin
result := 0;
if p^.RunInThrd then
GlobalThreadPool.postjob(@processtimer, cardinal(p))
else
postmessage(timerwnd.Wnd, WSM_TIMER, Integer(self), Integer(p));
end
else begin
result := cardinal(p);
jobs.Push(result);
setevent(waitevent);
end;
end;
{ TWorkerThread }
constructor TWorkerThread.Create;
begin
waitevent := createevent(nil, false, false, nil);
inherited create(false);
end;
procedure TWorkerThread.Execute;
begin
FreeOnTerminate := true;
while not terminated do
try
if waitforsingleobjectex(waitevent, INFINITE, true) = WAIT_IO_COMPLETION then
begin
jobcnt := 0;
end;
except
{$IFDEF DEBUGMSG}
on e: exception do
LogDbgMsg('WorkerThread.Execute error: '+e.Message);
{$ENDIF}
end;
closehandle(waitevent);
end;
procedure TWorkerThread.Stop;
begin
try
terminate;
setevent(waitevent);
except
{$IFDEF DEBUGMSG}
on e: exception do
LogDbgMsg('WorkerThread.Stop error: '+e.Message);
{$ENDIF}
end;
end;
{ TWorkerThreadNT }
constructor TWorkerThreadNT.Create(Pt: Cardinal);
begin
GlobalCPort := Pt;
TThread(Self).Create(false);
end;
procedure TWorkerThreadNT.Execute;
var
l, Key: Cardinal;
o: PIOCPOverlapped;
n: Integer;
begin
freeonterminate := true;
while not terminated do
try
getqueuedcompletionstatus(GlobalCPort, l, key, POverlapped(o), INFINITE);
if terminated then break;
case key of
IOCP_TIMERJOB:
begin
{asm
PUSH EBX
MOV EDX, o
PUSH EDX
MOV EAX, ESP
MOV V, EAX
MOV EAX, EDX
MOV EBX, l
CALL EBX
MOV EAX, ESP
CMP EAX, V
JNZ @@1
POP EDX
@@1:
POP EBX
end;}
processtimer(PTimerInfo(o));
end;
else
n := -GetLastError;
if o^.CallBack <> nil then
TIOCPCallback(o^.CallBack)(o^.oSelf, key, l, o, n);
end;
except
{$IFDEF DEBUGMSG}
on e: exception do
LogDbgMsg('WorkerThreadNT.Execute error: '+e.Message);
{$ENDIF}
end;
end;
procedure TWorkerThreadNT.Stop;
begin
try
Terminate;
postqueuedcompletionstatus(GlobalcPort, 0, IOCP_TIMERJOB, nil);
except
{$IFDEF DEBUGMSG}
on e: exception do
LogDbgMsg('WorkerThreadNT.Stop error: '+e.Message);
{$ENDIF}
end;
end;
{ TWorkerThreadPool }
constructor TWorkerThreadPool.Create(Cnt: Integer);
var
i: Integer;
begin
setlength(pool, cnt);
if win32platform = VER_PLATFORM_WIN32_NT then
begin
GlobalCPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
for i := 0 to high(Pool) do
Pool[i] := TWorkerThread(TWorkerThreadNT.Create(GlobalCPort));
end
else begin
for i := 0 to high(pool) do
pool[i] := TWorkerThread.Create;
end;
end;
destructor TWorkerThreadPool.Destroy;
var
i: Integer;
begin
for i := 0 to high(pool) do
pool[i].Stop;
if globalcport <> 0 then
closehandle(globalcport);
inherited;
end;
function TWorkerThreadPool.AllocThread: TWorkerThread;
var
i: Integer;
c: Integer;
begin
c := MaxInt;
result := nil;
for i := 0 to high(pool) do
if pool[i].JobCnt < c then
begin
c := pool[i].JobCnt;
result := pool[i];
if result.JobCnt = 0 then break;
end;
InterlockedIncrement(result.JobCnt);
end;
function TWorkerThreadPool.PostJob(Func: Pointer; Data: Cardinal): Cardinal;
var
t: TWorkerThread;
begin
{if length(globalthreadpool) = 0 then
begin
postmessage(timerwnd.Wnd, WSM_USER_FUNC, Integer(func), data);
result := 0;
end
else }if win32platform = VER_PLATFORM_WIN32_NT then
begin
postqueuedcompletionstatus(GlobalCPort, Cardinal(Func), IOCP_TIMERJOB, POverlapped(Data));
result := 0;
end
else begin
t := AllocThread;
if t = nil then
begin
postmessage(timerwnd.Wnd, WSM_USER_FUNC, Integer(func), data);
result := 0;
end
else begin
result := t.Handle;
queueuserapc(func, result, data);
end;
end;
end;
Initialization
InitAll;
{IsMultiThread := True;
InitTimerWnd;
InitGlobalThreadPool(MAX_THREAD_CNT);
}
finalization
UninitTimerWnd;
end.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -