📄 untudpctl.pas
字号:
{*******************************************************}
{ 单元名: UntUdpCtl.pas }
{ 创建日期:2006-6-9 14:20:12 }
{ 创建者 马敏钊 }
{ 功能: V1.0 }
{ UDP的安全传输控制类暂时基于NMUDP }
{ 负责 }
{ 分包控制 }
{ 应答控制 }
{ 重发控制 }
{ 包头大于CSafeUdpData的数据就是本类的 }
{ 控制处理协议 }
{*******************************************************}
unit UntUdpCtl;
interface
uses Classes, NMUDP, sysutils, Contnrs;
const
CSafeUdpData = 100000; {UDP数据控制包的最低命令号}
CSafeUdpData_Uses = 200000; {用户使用的命令}
CSafeUdpData_RecvResp = CSafeUdpData_Uses + 1; //回应收到命令
CSafeUdpHeadSingl = 234;
type
EUdpCtlReTryNoResp = class(Exception); {重试N次后报错}
EUdpCtlTimeOut = class(Exception); {一直重试直到超时}
{丢包的处理方式}
SHeadLv = (SDoDrop {重试N次后就不再发送}, SDoError {重试N次后报错}, SDoTimeOut {一直重试直到超时});
{包头}
PSafeUdphead = ^RSafeUdphead;
RSafeUdphead = packed record
Id: Integer; {命令号}
Singl: Byte; {标志号234 用于判断是否是属于SafeUDP的包的标志}
IsNeedResp: boolean; {是否需要回复}
Part, TotPart: Word; {包的第几部分,共有几部分}
Lv: SheadLv; {包的等级 0、 1、 2}
PackedId: Cardinal; {包的ID号 ,如果此号大于0说明需要回应}
Size: Cardinal; {大小}
end;
{数据包}
PSafeUdpData = ^RSafeUdpData;
RSafeUdpData = packed record
Head: RSafeUdphead; {包头}
Data: Pointer; {包体}
SendTime: Cardinal; {发送时间}
ReTryCount: Byte; {重试次数}
end;
{混合包类}
TDataMixer = class
public
Id: Cardinal; {包的ID}
BeginTime: Cardinal; {开始混合的时间}
TotPart, CurrPart: Byte; {包的总数,现有的包数}
DataList: TStrings; {数据列表}
constructor Create();
destructor Destroy; override;
end;
SUDPSendKind = (SUSKFreeWindows {滑动窗口模式}, SUSKOnlyOne {每次只发送一个命令直到返回才继续});
SUdpCtlState = (SRaiseExpection {抛出异常}, SUseErrorEvent {执行错误事件}); {Udp控制状态}
TudpCtl = class;
TUdpSenderThread = class(TThread) {数据发送处理线程}
public
Owner: TudpCtl;
procedure Execute; override;
constructor Create(ISStop: boolean; IOwner: TudpCtl);
end;
TUdpRecThread = class(TThread) {数据接收处理线程}
public
Owner: TudpCtl;
procedure Execute; override;
constructor Create(ISStop: boolean; IOwner: TudpCtl);
end;
TUdpSendedThread = class(TThread) {数据重发处理线程}
public
Owner: TudpCtl;
procedure Execute; override;
constructor Create(ISStop: boolean; IOwner: TudpCtl);
end;
TSafeUDPRecData = procedure(ISender: TudpCtl; IData: Pointer; ISize: Cardinal;
IFromIP: string; IPort: integer) of object;
TSafeUDPError = procedure(Sender: TObject; IData: PSafeUdpData; IErrorKind: SHeadLv) of object;
TudpCtl = class(TNMUDP)
private
FSendKind: SUDPSendKind;
FErrorKind: SUdpCtlState;
FFreeWindowsCount: Byte;
FPackedId: Cardinal;
//SendCount, ErrorCount: Cardinal; {发包数量,错包数量}
procedure ClearOneData(iidx: Integer; IList: Tstrings); overload; {释放一个包}
procedure ClearList(IList: TStrings); {清空列表}
procedure ClearQueue(IQueue: TQueue); {清空队列}
function GetAnPackedId: Cardinal; {获取一个数据包ID}
function GetAnPackedData(IDataSize: integer): PSafeUdpData; {获取一个包}
procedure OnData(Sender: TComponent; NumberBytes: Integer; FromIP: string; Port: integer); {处理接收数据}
protected
SenderThread: TUdpSenderThread; {发送处理线程}
RecThread: TUdpRecThread; {接收处理线程}
SendedThrad: TUdpSendedThread; {状态处理线程}
{已发送的数据列表,队列中等待发送的数据列表,数据混合列表,数据接收列表}
SendedList, SendingList, DataMixList: TStrings;
RecQueue: TQueue; {接收队列}
SendMemory: TMemoryStream; {正在发送缓冲,和剩余发送缓冲}
//------------------------------------------------------------------------------
// 自动触发的事件 2006-6-5 马敏钊
//------------------------------------------------------------------------------
procedure OnReSendEvent(Sender: TObject); {到时重发事件}
procedure OnTimeOutEvent(Sender: TObject); {接收超时事件}
//------------------------------------------------------------------------------
// 可供子类改写的方法 2006-6-9 马敏钊
//------------------------------------------------------------------------------
procedure PackedData(var IBuff; ISize: Cardinal; Ilv: SheadLv); {包裹数据}
procedure UnPackedData(IMixer: TDataMixer); {反包裹数据}
procedure AddToSendList(IId: string; IData: Pointer);
procedure DidSend(IData: PSafeUdpData); {实际的发送过程}
function CheckPacked(IData: PSafeUdphead): Integer; {*检查是否是SafeUdp的包}
function MixData(IData: PSafeUdpData; Iidx: integer): boolean; {混合包}
procedure InterSleep; {动态调整间隔时间}
{安全控制数据处理过程 返回值为是否需要交给逻辑程序继续处理}
function CaseData(IData: Pointer; IPeerIP: string; IPeerPort, IDataLen:
Integer): Boolean;
{检查发送的数据}
procedure CheckData;
{处理数据}
procedure DidData;
{发送数据}
procedure SendData;
public
MaxReSendCount: Byte; {最大重试次数 默认为3}
PeerSize: Word; {分包时每个包的大小 默认为1024}
WaiteTimeOut, ReSendTime: Cardinal; {超时时间 默认为3000,丢包重发时间500}
CurrRealData {当前的实际已经处理完毕的包}: TMemoryStream;
CurrDataSize: Cardinal; {当前数据的大小}
SleepTime: Cardinal; {内部睡眠时间}
OnDataCase: TSafeUDPRecData; {用户自己的处理事件}
OnDataError: TSafeUDPError; {数据错误时触发的事件}
//------------------------------------------------------------------------------
// 属性 2006-6-9 马敏钊
//------------------------------------------------------------------------------
{错误处理方式 默认为使用事件}
property ErrorKind: SUdpCtlState read FErrorKind write FErrorKind;
{发送模式 默认为滑动窗口方式}
property SendKind: SUDPSendKind read FSendKind write FSendKind;
{滑动窗口数量 默认为10}
property FreeWindowsCount: Byte read FFreeWindowsCount write FFreeWindowsCount;
//------------------------------------------------------------------------------
// 提供给外界的接口 2006-6-5 马敏钊
//------------------------------------------------------------------------------
class procedure ClearOneData(IData: PSafeUdpData); overload; {释放一个包}
{安全发送数据 自动分包}
class function IsUdpCtlData(IData: Pointer; ISize: Integer): boolean;
procedure SetCurrData(Idata: Pointer; ISize: Integer); {设置当前的数据}
procedure SafeSendBuff(IIp: string; IPort: Word; var IBuff; ISize: Cardinal;
Ilv: SheadLv); overload;
procedure SafeSendBuff(var IBuff; ISize: Cardinal; Ilv: SheadLv = SDoError);
overload;
constructor Create(AOwner: TComponent); override;
destructor Destroy; override;
end;
implementation
uses IniFiles, windows, untfunctions, UntProctol, pmybasedebug;
type
{接收到的数据}
TDataer = class
public
FromIP: string;
Port: Word;
Data: Pointer;
Size: Cardinal;
constructor Create(IFromIP: string; IPort: Word; ISource: Pointer; ISize: Cardinal);
destructor Destroy; override;
end;
function OrderShort(List: TStringList; Index1, Index2: Integer): Integer; {排序}
begin
Result := IfThen(StrToInt(List.Strings[Index1]) > StrToInt(List.Strings[Index2]), 1, -1);
end;
{ TudpCtl }
constructor TudpCtl.Create(AOwner: TComponent);
begin
inherited Create(AOwner);
SleepTime := 10;
MaxReSendCount := 3;
WaiteTimeOut := 3000;
PeerSize := 1024; //512 + 256;
FPackedId := 0;
ReSendTime := 200;
FSendKind := SUSKFreeWindows;
FErrorKind := SUseErrorEvent;
FFreeWindowsCount := 4;
SendedList := THashedStringList.Create;
SendingList := THashedStringList.Create;
DataMixList := THashedStringList.Create;
RecQueue := TQueue.Create;
SendMemory := TMemoryStream.Create;
CurrRealData := TMemoryStream.Create;
SenderThread := TUdpSenderThread.Create(False, Self);
RecThread := TUdpRecThread.Create(False, Self);
SendedThrad := TUdpSendedThread.Create(False, Self);
OnDataReceived := OnData;
end;
destructor TudpCtl.Destroy;
var
i: Integer;
begin
SenderThread.FreeOnTerminate := True;
SenderThread.Terminate;
RecThread.FreeOnTerminate := True;
RecThread.Terminate;
SendedThrad.FreeOnTerminate := True;
SendedThrad.Terminate;
Sleep(20);
for i := 0 to DataMixList.Count - 1 do
DataMixList.Objects[i].Free;
DataMixList.Free;
ClearQueue(RecQueue);
RecQueue.Free;
ClearList(SendingList);
SendingList.Free;
ClearList(SendedList);
SendedList.Free;
CurrRealData.Free;
SendMemory.Free;
inherited;
end;
function TudpCtl.CaseData(IData: Pointer; IPeerIP: string; IPeerPort, IDataLen:
Integer): Boolean;
var
LP: PChar;
LData: PSafeUdpData;
LReData: RSafeUdphead;
LIndex: Integer;
LIdx: string;
begin
Result := False;
LReData := PSafeUdphead(IData)^;
CurrDataSize := LReData.Size;
{判断如果是回复包 则删除等待回复的包}
if LReData.Id = CSafeUdpData_RecvResp then begin
LIdx := IntToStr(LReData.PackedId);
LIdx := LIdx + '_' + IntToStr(LReData.Part);
LIndex := SendedList.IndexOf(LIdx);
if LIndex > -1 then
SendedList.Delete(LIndex);
exit;
end;
{判断是否需要回复}
if LReData.IsNeedResp then begin
LReData.Id := CSafeUdpData_RecvResp;
LReData.Singl := CSafeUdpHeadSingl;
LReData.IsNeedResp := False;
RemoteHost := IPeerIP;
RemotePort := IPeerPort;
SendBuffer(LReData, Sizeof(LReData));
// Gob_Debug.AddLogShower('回复数据包--->:' + IntToStr(LReData.PackedId) + '_' + IntToStr(LReData.Part));
end;
{否则就是数据包}
{判断是否是需要组合的包}
if LReData.TotPart > 1 then begin
LIndex := CheckPacked(PsafeUdpHead(IData));
if LIndex > -1 then begin
{生成一个包}
//Gob_Debug.AddLogShower('>>>%d-%d---%d', [PsafeUdpHead(IData)^.PackedId, PsafeUdpHead(IData)^.Part, PsafeUdpHead(IData)^.Size]);
LData := GetAnPackedData(PsafeUdpHead(IData)^.Size);
LData^.Head := PsafeUdpHead(IData)^;
LP := IData;
inc(LP, Sizeof(RSafeUdpHead));
CopyMemory(LData^.Data, LP, LData^.Head.Size);
Result := MixData(LData, LIndex);
end;
end
else begin
CurrRealData.SetSize(LReData.Size);
LP := IData;
inc(LP, Sizeof(RSafeUdpHead));
CurrRealData.Position := 0;
CurrRealData.WriteBuffer(Lp^, LReData.Size);
Result := True;
end;
end;
procedure TudpCtl.SafeSendBuff(IIp: string; IPort: Word; var IBuff; ISize:
Cardinal; Ilv: SheadLv);
begin
RemoteHost := IIp;
RemotePort := IPort;
SafeSendBuff(IBuff, ISize, Ilv);
end;
function TudpCtl.GetAnPackedId: Cardinal;
begin
inc(FPackedId);
Result := FPackedId;
if FPackedId = high(Cardinal) then
FPackedId := 0;
end;
procedure TudpCtl.OnReSendEvent(Sender: TObject);
begin
end;
procedure TudpCtl.OnTimeOutEvent(Sender: TObject);
begin
end;
procedure TudpCtl.CheckData;
var
I: Integer;
LData: PSafeUdpData;
begin
{检查每一个发送出去并且需要回应的数据}
for I := SendedList.Count - 1 downto 0 do begin // Iterate
try
LData := PSafeUdpData(SendedList.Objects[i]);
{判断时间是否超过 ReSendTime 超过了才去检查}
if (GetTickCount - LData^.SendTime < ReSendTime) then begin
Continue;
end;
{重试N次后就不再发送,重试N次后报错,一直重试直到超时}
case LData^.Head.Lv of //
SDoDrop: begin
if LData^.ReTryCount <= MaxReSendCount then begin
LData^.SendTime := GetTickCount;
Inc(LData^.ReTryCount);
SendBuffer(LData^.Data^, LData^.Head.Size);
InterSleep;
end
else begin
//丢弃当前包
SendedList.Delete(i);
if FErrorKind = SUseErrorEvent then
if assigned(OnDataError) then
OnDataError(Self, LData, SDoDrop);
end;
end;
SDoError: begin
if LData^.ReTryCount <= MaxReSendCount then begin
LData^.SendTime := GetTickCount;
Inc(LData^.ReTryCount);
SendBuffer(LData^.Data^, LData^.Head.Size);
InterSleep;
end
else begin
SendedList.Delete(i);
if FErrorKind = SUseErrorEvent then begin
if assigned(OnDataError) then
OnDataError(Self, LData, SDoError);
end else
raise EUdpCtlReTryNoResp.CreateFmt('数据包在重试%d次后仍然无回应', [MaxReSendCount]);
end;
end;
SDoTimeOut: begin
if GetTickCount - LData^.SendTime >= WaiteTimeOut then begin
if FErrorKind = SUseErrorEvent then begin
if assigned(OnDataError) then
OnDataError(Self, LData, SDoTimeOut);
end else
raise EUdpCtlTimeOut.CreateFmt('数据包回应超时<%d>', [WaiteTimeOut]);
end
else begin
LData^.SendTime := GetTickCount;
Inc(LData^.ReTryCount);
SendBuffer(LData^.Data^, LData^.Head.Size);
end;
end;
end; // case
except
end;
end; // for
end;
procedure TudpCtl.ClearOneData(iidx: Integer; IList: Tstrings);
var
LP: PSafeUdpData;
begin
LP := Pointer(SendedList.Objects[iidx]);
ClearOneData(LP);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -