📄 nhcbiznetdriver.pas
字号:
end;
begin
if Done then Exit;
try
case FState of
ttsConnect: DoStateConnect;
ttsWaitConnect: DoStateWaitConnect;
ttsSendReqPacket: DoStateSendReqPacket;
ttsWaitAckPacket: DoStateWaitAckPacket;
end;
except
// 任务失败
on E: ESocketException do
begin
FSocketError := True;
HandleException;
end;
on E: Exception do
begin
HandleException;
end;
end;
end;
//-----------------------------------------------------------------------------
// 描述: 处理应答包
//-----------------------------------------------------------------------------
procedure TTcpTask.ProcessAckPacket(const PacketBuffer; PacketSize: Integer;
var Handled: Boolean);
var
HeaderPtr: PBizTcpPacketHeader;
begin
if (ReqPacket <> nil) and FNeedAck and (FState = ttsWaitAckPacket) then
begin
HeaderPtr := PBizTcpPacketHeader(@PacketBuffer);
if HeaderPtr.SeqNumber = ReqPacket.Header.SeqNumber then
begin
Handled := True;
if FAckPacket.Size = 0 then
FAckPacket.Assign(PacketBuffer, PacketSize);
end;
end;
end;
{ TTcpTaskList }
constructor TTcpTaskList.Create;
begin
inherited;
ThreadSafe := True;
OwnsObjects := True;
end;
destructor TTcpTaskList.Destroy;
begin
inherited;
end;
function TTcpTaskList.GetItems(Index: Integer): TTcpTask;
begin
Result := TTcpTask(inherited Items[Index]);
end;
function TTcpTaskList.FindTask(Caller: TObject): TTcpTask;
var
Task: TTcpTask;
I: Integer;
begin
Result := nil;
for I := 0 to Count - 1 do
begin
Task := Items[I];
if Assigned(Task) and (Task.FCaller = Caller) then
begin
Result := Task;
Break;
end;
end;
end;
function TTcpTaskList.FindTask(OnTaskResult: TTcpTaskResultEvent): TTcpTask;
var
Task: TTcpTask;
I: Integer;
begin
Result := nil;
for I := 0 to Count - 1 do
begin
Task := Items[I];
if Assigned(Task) and (Addr(Task.FOnTaskResult) = Addr(OnTaskResult)) then
begin
Result := Task;
Break;
end;
end;
end;
procedure TTcpTaskList.Add(Task: TTcpTask);
begin
if Task <> nil then
inherited Add(Task, False);
end;
procedure TTcpTaskList.Remove(Task: TTcpTask);
begin
inherited Remove(Task);
end;
procedure TTcpTaskList.Delete(Index: Integer);
begin
inherited Delete(Index);
end;
function TTcpTaskList.Extract(Index: Integer): TTcpTask;
begin
Result := TTcpTask(inherited Extract(Index));
end;
procedure TTcpTaskList.Clear;
begin
inherited Clear;
end;
procedure TTcpTaskList.SetSocketError(Connection: TTcpConnection;
SocketError: Boolean);
var
Task: TTcpTask;
I: Integer;
begin
Lock;
try
for I := 0 to Count - 1 do
begin
Task := Items[I];
if Assigned(Task) and (Task.FConnection = Connection) then
Task.FSocketError := SocketError;
end;
finally
Unlock;
end;
end;
procedure TTcpTaskList.RemoveTasks(Caller: TObject);
var
Task: TTcpTask;
begin
Lock;
try
while True do
begin
Task := FindTask(Caller);
if Task <> nil then
inherited Remove(Task)
else
Break;
end;
finally
Unlock;
end;
end;
procedure TTcpTaskList.RemoveTasks(OnTaskResult: TTcpTaskResultEvent);
var
Task: TTcpTask;
begin
Lock;
try
while True do
begin
Task := FindTask(OnTaskResult);
if Task <> nil then
inherited Remove(Task)
else
Break;
end;
finally
Unlock;
end;
end;
procedure TTcpTaskList.CancelResultEvent(Caller: TObject);
var
Task: TTcpTask;
I: Integer;
begin
Lock;
try
for I := 0 to Count - 1 do
begin
Task := Items[I];
if Assigned(Task) and (Task.FCaller = Caller) then
Task.CancelResultEvent;
end;
finally
Unlock;
end;
end;
{ TTcpTaskExecutor }
constructor TTcpTaskExecutor.Create;
begin
inherited Create;
FTaskList := TTcpTaskList.Create;
end;
destructor TTcpTaskExecutor.Destroy;
begin
Clear;
FTaskList.Free;
inherited;
end;
//-----------------------------------------------------------------------------
// 描述: 添加一个TCP连接请求到处理器中
// 参数:
// PeerAddr - 目的地
// Connection - TCP连接
// ReqPacket - 将要发送的数据包
// NeedAck - 是否需要应答包
// Caller - 调用者对象
// OnTaskResult - 连接结果事件
//-----------------------------------------------------------------------------
procedure TTcpTaskExecutor.AddRequest(const PeerAddr: TPeerAddress;
Connection: TTcpConnection; ReqPacket: TBizTcpPacket; NeedAck: Boolean;
Caller: TObject; OnTaskResult: TTcpTaskResultEvent);
var
TcpTask: TTcpTask;
begin
TcpTask := TTcpTask.Create;
TcpTask.InitParams(Self, PeerAddr, Connection, ReqPacket,
NeedAck, Caller, OnTaskResult);
FTaskList.Add(TcpTask);
end;
//-----------------------------------------------------------------------------
// 描述: 收到应答包后,将其放入处理器中对应的位置
//-----------------------------------------------------------------------------
procedure TTcpTaskExecutor.ProcessAckPacket(const PacketBuffer; PacketSize: Integer);
var
I: Integer;
Handled: Boolean;
begin
FTaskList.Lock;
try
Handled := False;
for I := 0 to FTaskList.Count - 1 do
begin
FTaskList[I].ProcessAckPacket(PacketBuffer, PacketSize, Handled);
if Handled then Break;
end;
finally
FTaskList.Unlock;
end;
end;
//-----------------------------------------------------------------------------
// 描述: 将一个任务直接加到列表中
//-----------------------------------------------------------------------------
procedure TTcpTaskExecutor.AddTask(Task: TTcpTask);
begin
FTaskList.Add(Task);
end;
//-----------------------------------------------------------------------------
// 描述: 从处理器中删除请求 (根据 Caller)
//-----------------------------------------------------------------------------
procedure TTcpTaskExecutor.RemoveRequest(Caller: TObject);
begin
FTaskList.RemoveTasks(Caller);
end;
//-----------------------------------------------------------------------------
// 描述: 从处理器中删除请求 (根据 OnTaskResult)
//-----------------------------------------------------------------------------
procedure TTcpTaskExecutor.RemoveRequest(OnTaskResult: TTcpTaskResultEvent);
begin
FTaskList.RemoveTasks(OnTaskResult);
end;
//-----------------------------------------------------------------------------
// 描述: 取消结果事件 (根据 Caller)
//-----------------------------------------------------------------------------
procedure TTcpTaskExecutor.CancelResultEvent(Caller: TObject);
begin
FTaskList.CancelResultEvent(Caller);
end;
//-----------------------------------------------------------------------------
// 描述: SetSocketError
//-----------------------------------------------------------------------------
procedure TTcpTaskExecutor.SetSocketError(Connection: TTcpConnection;
SocketError: Boolean);
begin
FTaskList.SetSocketError(Connection, SocketError);
end;
//-----------------------------------------------------------------------------
// 描述: 清空处理器
//-----------------------------------------------------------------------------
procedure TTcpTaskExecutor.Clear;
begin
FTaskList.Clear;
end;
//-----------------------------------------------------------------------------
// 描述: 执行器的工作函数
//-----------------------------------------------------------------------------
procedure TTcpTaskExecutor.Execute;
var
Task: TTcpTask;
Conn: TTcpConnection;
DoneList: TList;
ConnList: TIntSet;
I: Integer;
begin
FTaskList.Lock;
try
if FTaskList.Count = 0 then Exit;
ConnList := TIntSet.Create;
DoneList := TList.Create;
try
// 对于同一个TCP连接上的各个任务,必须按先后顺序执行
for I := 0 to FTaskList.Count - 1 do
begin
Task := FTaskList[I];
Conn := Task.Connection;
if (Conn = nil) or not ConnList.ValueExists(Integer(Conn)) then
begin
if Conn <> nil then
ConnList.Add(Integer(Conn));
Task.Process;
end;
end;
for I := FTaskList.Count - 1 downto 0 do
begin
if FTaskList[I].Done then
DoneList.Add(FTaskList.Extract(I));
end;
for I := DoneList.Count - 1 downto 0 do
NetMgr.BizNetDriver.FTcpDoneTaskList.Add(TTcpTask(DoneList[I]));
finally
DoneList.Free;
ConnList.Free;
end;
finally
FTaskList.Unlock;
end;
end;
{ TTcpPacketReceiver }
constructor TTcpPacketReceiver.Create(Connection: TTcpConnection;
MaxRecvPktCount: Integer);
begin
inherited Create;
FConnection := Connection;
FMaxRecvPktCount := MaxRecvPktCount;
FBuffer := TBufferStream.Create;
end;
destructor TTcpPacketReceiver.Destroy;
begin
FBuffer.Free;
inherited;
end;
procedure TTcpPacketReceiver.RecvPacket;
const
HeaderSize = SizeOf(TBizTcpPacketHeader);
var
R, DataSize: Integer;
begin
// 如果头部数据尚未接收完毕
if FBuffer.Position < HeaderSize then
begin
if FBuffer.Size = 0 then
FBuffer.Size := HeaderSize;
R := FConnection.ReadBuffer(
(FBuffer.Memory + FBuffer.Position)^,
HeaderSize - FBuffer.Position);
if R > 0 then
FBuffer.Position := FBuffer.Position + R;
end;
// 如果头部数据已经接收完毕
if FBuffer.Position >= HeaderSize then
begin
if FBuffer.Size = HeaderSize then
begin
DataSize := PBizTcpPacketHeader(FBuffer.Memory).DataSize;
FBuffer.Size := HeaderSize + DataSize;
end;
// 如果全部数据尚未接收完毕
if FBuffer.Position < FBuffer.Size then
begin
R := FConnection.ReadBuffer(
(FBuffer.Memory + FBuffer.Position)^,
FBuffer.Size - FBuffer.Position);
if R > 0 then
FBuffer.Position := FBuffer.Position + R;
end;
// 如果全部数据已经接收完毕
if FBuffer.Position >= FBuffer.Size then
begin
Inc(FCurRecvPktCount);
if (FCurRecvPktCount >= FMaxRecvPktCount) and (FMaxRecvPktCount > 0) then
FDone := True;
NetMgr.BizNetDriver.DispatchTcpPacket(FConnection, FBuffer.Memory^, FBuffer.Size);
FBuffer.Clear;
end;
end;
end;
procedure TTcpPacketReceiver.Process;
begin
try
if not FDone then RecvPacket;
except
on ESocketException do
begin
FDone := True;
NetMgr.BizNetDriver.FTcpTaskExecutor.SetSocketError(FConnection, True);
end;
on Exception do
FDone := True;
end;
end;
{ TTcpPacketRecverList }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -