📄 sockutils.pas
字号:
Index: Integer;
begin
FCriticalSection.Enter;
try
// 加入新的活动到队列中
_NewAlloc(SizeOf(TActionDelayEntry), Pointer(Entry));
Entry^.Action := Action;
BuildQWord(Entry^.Delay, GetCurrentTicks, Milliseconds, FRound);
if FList.Count = 0 then
FList.Add(Entry)
else
begin
FList.BinarySearch(Compare, Entry^.Delay, Index);
FList.Insert(Index, Entry);
end;
Fire;
finally
FCriticalSection.Leave;
end;
end;
procedure TAsyncActionDelayQueue.Dequeue;
begin
FCriticalSection.Enter;
try
Fire;
finally
FCriticalSection.Leave;
end;
end;
{ TAsyncActions }
constructor TAsyncActions.Create;
begin
FCriticalSection := TCriticalSection.Create;
FList := TList.Create;
FPool := TList.Create;
FPrimaryQueue := TAsyncActionQueue.Create;
FSecondaryQueue := TAsyncActionDelayQueue.Create;
end;
destructor TAsyncActions.Destroy;
procedure FreeList(List: TList);
var
I: Integer;
begin
I := List.Count;
while I > 0 do
begin
Dec(I);
TAsyncAction(List[I]).Free;
end;
List.Free;
end;
begin
CancelWaitableTimer(FSecondaryQueue.FTimer);
FPrimaryQueue.Free;
FSecondaryQueue.Free;
FreeList(FPool);
FreeList(FList);
FCriticalSection.Free;
inherited;
end;
function TAsyncActions.CompareActionID(Item: Pointer; const ID): Integer;
begin
Result := TAsyncAction(Item).ID - Integer(ID);
end;
function TAsyncActions.CompareActionClass(Item: Pointer; const ActionClass): Integer;
begin
Result := AnsiCompareText(TAsyncAction(Item).ClassName, string(ActionClass));
end;
function TAsyncActions.Find(ID: Integer; var Index: Integer): Boolean;
begin
Result := FList.BinarySearch(CompareActionID, ID, Index);
end;
function TAsyncActions.Find(const ActionClass: string; var Index: Integer): Boolean;
begin
Result := FPool.BinarySearch(CompareActionClass, ActionClass, Index);
end;
function TAsyncActions.GetAction(ActionClass: TAsyncActionClass;
Site: PSiteInfo): TAsyncAction;
begin
Result := GetAction(ActionClass, Site, nil, 0);
end;
function TAsyncActions.GetAction(ActionClass: TAsyncActionClass;
Site: PSiteInfo; Buf: Pointer; Size: Integer): TAsyncAction;
var
Index: Integer;
begin
FCriticalSection.Enter;
try
if Find(ActionClass.ClassName, Index) then
begin
Result := FPool[Index];
FPool.Delete(Index);
Result.FSite := Site;
Result.SetBuffer(Buf, Size);
end
else
Result := ActionClass.Create(Site);
if (Buf <> nil) and (Size > 0) then
Result.SetBuffer(Buf, Size);
Inc(FLastID);
Result.FID := FLastID;
FList.Add(Result);
finally
FCriticalSection.Leave;
end;
end;
function TAsyncActions.Find(ID: Integer): TAsyncAction;
var
Index: Integer;
begin
if Find(ID, Index) then
Result := FList[Index]
else
Result := nil;
end;
procedure TAsyncActions.Abandon(ID: Integer);
var
Index: Integer;
Action: TAsyncAction;
begin
FCriticalSection.Enter;
try
if Find(ID, Index) then
begin
Action := FList[Index];
FList.Delete(Index);
Action.FID := 0;
Find(Action.ClassName, Index);
FPool.Insert(Index, Action);
end;
finally
FCriticalSection.Leave;
end;
end;
procedure TAsyncActions.Queue(Action: TAsyncAction);
begin
FPrimaryQueue.Queue(Action);
end;
procedure TAsyncActions.QueueDelay(Action: TAsyncAction; Milliseconds: Integer);
begin
FSecondaryQueue.Queue(Action, Milliseconds);
end;
function TAsyncActions.Dequeue: TAsyncAction;
begin
if not FPrimaryQueue.Dequeue(Result) then
Result := nil;
end;
procedure TAsyncActions.Requeue;
begin
FSecondaryQueue.Dequeue;
end;
{ TAcceptAction }
constructor TAcceptAction.Create(Site: PSiteInfo);
begin
inherited Create(Site);
_NewAlloc(BUFFER_SIZE, FBuffer);
end;
destructor TAcceptAction.Destroy;
begin
_Free(FBuffer);
inherited;
end;
function TAcceptAction.DoExecute: Boolean;
begin
LogMessage('A1: WOULD RECEIVE DATA FROM CLIENT %s[%s]', [FSite^.HostName, FSite^.Address]);
if not ConnectedAsClient then
Result := False
else
Result := ReceiveFromClient(FSite, FIOData, ID, FBuffer, BUFFER_SIZE);
end;
procedure TAcceptAction.DoComplete(Bytes: Integer);
begin
LogMessage('A2: RECEIVED %d BYTES FROM CLIENT %s[%s]', [Bytes, FSite^.HostName, FSite^.Address]);
if ConnectedAsClient then
Service.ProcessRequest(FSite, FBuffer, Bytes);
Abandon;
end;
{ TReplyAction }
constructor TReplyAction.Create(Site: PSiteInfo);
begin
inherited Create(Site);
FBuffer := nil;
FSize := 0;
FCapacity := 0;
FSent := 0;
end;
destructor TReplyAction.Destroy;
begin
if FBuffer <> nil then
_Free(FBuffer);
inherited;
end;
procedure TReplyAction.DoAbandon;
begin
FSent := 0;
end;
function TReplyAction.DoExecute: Boolean;
var
P: Pointer;
begin
LogMessage('A4: WOULD SEND %d BYTES TO CLIENT %s[%s] FOR REPLY', [FSize - FSent, FSite^.HostName, FSite^.Address]);
if not ConnectedAsClient then
Result := False
else
begin
P := Pointer(Integer(FBuffer) + FSent);
Result := SendToClient(FSite, FIOData, ID, P, FSize - FSent);
end;
end;
procedure TReplyAction.DoComplete(Bytes: Integer);
begin
LogMessage('A5: SENT %d BYTES TO CLIENT %s[%s] FOR REPLY', [Bytes, FSite^.HostName, FSite^.Address]);
if ConnectedAsClient then
begin
Inc(FSent, Bytes);
if FSent < FSize then
begin
// 未传输完,继续传输
Queueing;
Exit;
end
else
begin
LogMessage('A6: FINISHED, WOULD NEXT ROUND TO CLIENT %s[%s]', [FSite^.HostName, FSite^.Address]);
Actions.GetAction(TAcceptAction, FSite).Queueing;
end;
end;
Abandon;
end;
procedure TReplyAction.SetBuffer(Buf: Pointer; Size: Integer);
begin
if FSent = 0 then
begin
FSize := Size;
if FCapacity < FSize then
begin
FCapacity := FSize;
_Realloc(FCapacity, FBuffer);
end;
Move(Buf^, FBuffer^, FSize);
end;
end;
{ TSendAction }
constructor TSendAction.Create(Site: PSiteInfo);
begin
inherited Create(Site);
FBuffer := nil;
FSize := 0;
FCapacity := 0;
FSent := 0;
FAttempt := 0;
end;
destructor TSendAction.Destroy;
begin
if FBuffer <> nil then
_Free(FBuffer);
inherited;
end;
procedure TSendAction.DoAbandon;
begin
FSent := 0;
FAttempt := 0;
end;
function TSendAction.DoExecute: Boolean;
var
P: Pointer;
begin
LogMessage('B1: WOULD SENT %d BYTES TO SERVER %s[%s]', [FSize - FSent, FSite^.HostName, FSite^.Address]);
Inc(FAttempt);
Result := ConnectedAsServer;
if not Result then
Result := ConnectServer(Service.Port, FSite);
if Result then
begin
P := Pointer(Integer(FBuffer) + FSent);
Result := SendToServer(FSite, FIOData, ID, P, FSize - FSent);
end
else
begin
Result := FAttempt < 5;
if Result then
begin
FSent := 0;
Queueing(30000);
end;
end;
end;
procedure TSendAction.DoComplete(Bytes: Integer);
begin
LogMessage('B2: SENT %d BYTES TO SERVER %s[%s]', [Bytes, FSite^.HostName, FSite^.Address]);
if ConnectedAsServer then
begin
Inc(FSent, Bytes);
if FSent < FSize then
begin
// 未发送完,继续发送剩余部分
Queueing;
Exit;
end
else
Actions.GetAction(TWaitAction, FSite).Queueing;
end;
Abandon;
end;
procedure TSendAction.SetBuffer(Buf: Pointer; Size: Integer);
begin
if FSent = 0 then
begin
FSize := Size;
if FCapacity < FSize then
begin
FCapacity := FSize;
_Realloc(FCapacity, FBuffer);
end;
Move(Buf^, FBuffer^, FSize);
end;
end;
{ TWaitAction }
constructor TWaitAction.Create(Site: PSiteInfo);
begin
inherited Create(Site);
FCapacity := BUFFER_SIZE;
FReceived := 0;
_NewAlloc(FCapacity, FBuffer);
end;
destructor TWaitAction.Destroy;
begin
_Free(FBuffer);
inherited;
end;
procedure TWaitAction.DoAbandon;
begin
FReceived := 0;
end;
function TWaitAction.DoExecute: Boolean;
var
P: Pointer;
begin
LogMessage('B3: WOULD RECEIVE DATA FROM SERVER %s[%s] FOR REPLY', [FSite^.HostName, FSite^.Address]);
Result := ConnectedAsServer;
if Result then
begin
P := Pointer(Integer(FBuffer) + FReceived);
Result := ReceiveFromServer(FSite, FIOData, ID, P, BUFFER_SIZE);
end;
end;
procedure TWaitAction.DoComplete(Bytes: Integer);
begin
LogMessage('B4: RECEIVED %d BYTES FROM SERVER %s[%s] FOR REPLY', [Bytes, FSite^.HostName, FSite^.Address]);
if ConnectedAsServer then
begin
Inc(FReceived, Bytes);
if Service.ProcessReply(FSite, FBuffer, FReceived) then
// 已接收完整,关闭连接
Actions.GetAction(TCloseAction, FSite).Queueing
else
begin
// 未接收完整,继续接收
Inc(FCapacity, Bytes);
_Realloc(FCapacity, FBuffer);
Queueing;
Exit;
end;
end;
Abandon;
end;
{ TCloseAction }
constructor TCloseAction.Create(Site: PSiteInfo);
begin
inherited Create(Site);
_NewAlloc(BUFFER_SIZE, FBuffer);
end;
destructor TCloseAction.Destroy;
begin
_Free(FBuffer);
inherited;
end;
function TCloseAction.DoExecute: Boolean;
begin
LogMessage('B6: FINISHED, CLOSE TO SERVER %s[%s] FOR REPLY', [FSite^.HostName, FSite^.Address]);
Result := ConnectedAsServer;
if Result then
Result := DisconnectServer(FSite);
if Result then
Result := ReceiveFromServer(FSite, FIOData, ID, FBuffer, BUFFER_SIZE);
end;
procedure TCloseAction.DoComplete(Bytes: Integer);
begin
Abandon;
end;
{ TJet }
constructor TJet.Create;
begin
FSites := TSiteInfos.Create;
end;
destructor TJet.Destroy;
begin
FSites.Free;
inherited;
end;
function TJet.GetEvent: THandle;
begin
Result := Actions.FPrimaryQueue.FEvent;
end;
// 等候网络事件
function TJet.WaitFor: Boolean;
var
Events: array[0..1] of THandle;
begin
Result := False;
Events[0] := Actions.FPrimaryQueue.FEvent;
Events[1] := Actions.FSecondaryQueue.FTimer;
while True do
case WaitForMultipleObjectsEx(2, @Events, False, INFINITE, True) of
WAIT_OBJECT_0:
begin
Result := True;
Break;
end;
WAIT_OBJECT_0 + 1:
Actions.Requeue;
end;
end;
// 执行队列事件
procedure TJet.Rollup;
var
Action: TAsyncAction;
begin
Action := Actions.Dequeue;
while Action <> nil do
begin
if not Action.Execute then
Action.Abandon;
Action := Actions.Dequeue;
end;
end;
// 发送数据
procedure TJet.Send(Site: PSiteInfo; Data: Pointer; Size: Integer);
begin
Actions.GetAction(TSendAction, Site, Data, Size).Queueing;
end;
// 发送响应
procedure TJet.Reply(Site: PSiteInfo; Data: Pointer; Size: Integer);
begin
Actions.GetAction(TReplyAction, Site, Data, Size).Queueing;
end;
end.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -