📄 utcmppcom.pas
字号:
unit UtCMPPCom;
interface
uses Classes, ScktComp, Sysutils, ExtCtrls, SyncObjs, Contnrs, UtCMPPMsg;
//基础函数
function ntohll(number: Int64): Int64;
//开启业务
function CMPPLogin: Integer; stdcall;
//停止业务
function CMPPLoginOut: Integer; stdcall;
//发送短信
function CMPPSendSingle(pSubmitInfo: PCMPPSubmit): Integer; stdcall;
//接受消息
function CMPPDeliver(const nTimeoutIn: Integer; pDeliverRespInfo: PCMPPDeliverResp): Integer; stdcall;
const
BUFFERSIZE = 8192; //接受数据缓冲
//服务状态
SS_IDLE = $0;
SS_WaitLoginResult = $1; //等待登陆结果
SS_WaitLogOutResult = $2; //等待登出结果
SS_WORK = $3; // 工作状态
type
TCMPPMsgSendThread = class;
TCMPPmsgWait = class;
TCMPPService = class(TThread)
private
ICPId: string;
ICPShareKey: string;
ServiceIP: string;
ServicePort: integer;
SubmitRetry: integer;
RequestTimeOut: integer;
RequestWindow: integer;
//发送Socket
CMPPClient: TClientSocket;
csSocket: TCriticalSection; //Socket 保护互斥
Recmh: TCMPPMsgHead;
FbIsReceiveData: Boolean;
pRevData: array[0..BUFFERSIZE - 1] of char;
//重连接定时器
ReConTimer: TTimer;
//Active定时器
ActiveTimer: TTimer;
//服务状态
ServiceStatus: integer;
{
SS_WaitLoginResult
:连接服务器,连接成功后发送Login消息,此时不接受消息任务
SS_WaitLogOutResult
:不接受新消息任务,等待把所有队列里面消息处理完毕后,发送Logout消息,
如果在自动重连状态,则把消息任务消息全部返回(返回结果为超时未处理)
}
//服务状态处理事件
ServiceEvent: TEvent;
//错误代码,用于数据 ServiceEvent 相关数据交换
ErrCode: integer;
//标志位
bServiceActive: Boolean; //服务
bComCanWrite: Boolean; //Socket Can Write
bComReboot: Boolean; //Socket Reboot
function InitParam: Boolean;
//Socket Rutine
//连接成功
procedure CMPP_ConnectSuc(Sender: TObject; Socket: TCustomWinSocket);
//连接断开
procedure CMPP_DisConnect(Sender: TObject; Socket: TCustomWinSocket);
//连接错误
procedure CMPP_Error(Sender: TObject; Socket: TCustomWinSocket; ErrorEvent: TErrorEvent;
var ErrorCode: Integer);
//可发送数据
procedure CMPP_CanWrite(Sender: TObject; Socket: TCustomWinSocket);
//读取事件
procedure CMPP_Read(Sender: TObject; Socket: TCustomWinSocket);
//时钟事件处理函数
procedure ReConTimerDo(Sender: TObject);
procedure ActiveTimerDo(Sender: TObject);
//设置服务状态
function SetServiceStatus(Status: integer): integer;
private
//待发送消息队列
SendList: TList;
csSendMsg: TCriticalSection;
SendEvent: TEvent;
//待回复消息队列
WaitList: TList;
csWaitMsg: TCriticalSection;
//收到待处理消息 //接受处理互斥量
RecQueue: TQueue;
csRecMsg: TCriticalSection;
RecEvent: TEvent;
//*******************************队列处理函数*****************************//
function GetDeliverMsg(const nTimeoutIn: Integer; pDeliverRespInfo: PCMPPDeliverResp): integer; //接受消息
function AddDeliverMsg(pMsg: PCMPPDeliverResp): integer; //添加接受消息
function AddSubmitMsg(pSubmitInfo: PCMPPSubmit): integer; //添加接受消息
function AddSubmitMsgEx(pSubmitInfo: PCMPPSubmit): integer; //添加接受消息
//查找等待消息
procedure SetWaitMsg(ASwquenceID: integer; Status: Integer; MsgID: Int64);
//退出清理队列
procedure ClearList;
private
CMPPVersion: Byte;
ClientSeqID: Integer;
MsgSend: TCMPPMsgSendThread; //发送消息线程
//登陆
CMPPMsg_Login: TCMPPMsg_Login;
CMPPMsg_Login_Resp: TCMPPMsg_Login_Resp;
//提交消息
CMPPMsg_Submit: TCMPPMsg_Submit;
CMPPMsg_Submit_Resp: TCMPPMsg_Submit_Resp;
//接受消息
CMPPMsg_Deliver: TCMPPMsg_Deliver;
CMPPMsg_Deliver_Resp: TCMPPMsg_Deliver_Resp;
//心跳
CMPPMsg_Active: TCMPPMsgHead;
CMPPMsg_Active_Resp: TCMPPMsgHead;
//退出
CMPPMsg_Exit: TCMPPMsgHead;
CMPPMsg_Exit_Resp: TCMPPMsgHead;
//*******************************消息处理函数*****************************//
//发送消息
function CMPPMsg_Send(var Buf; Size: Integer): Integer;
//根据指定长度来接受缓冲区.如接收不完整则循环等待接收
function CMPPMsg_Receive(var Buffer; RecLen: Integer): Boolean;
function CMPPMsg_SeTCMPPMsgHead(pMsg: PCMPPMsgHead; MsgCode: Integer; MsgLen: Integer):
integer;
function CMPPMsg_SeTCMPPMsgHeadEx(pMsg: PCMPPMsgHead; MsgCode: Integer; MsgLen: Integer; SwquenceID: Integer):
integer;
function CMPPMsg_GetNewSeq: Integer;
function CMPPMsg_SendLogin: integer;
procedure CMPPMsg_DealLoginResp(PMsg: PCMPPMsgBody_Login_Resp);
procedure CMPPMsg_DealSubmitResp(SwquenceID: Integer; PMsg: PCMPPMsgBody_Submit_Resp);
function CMPPMsg_SendDeliverResp(SwquenceID: Integer; AMsgID: Int64): integer;
procedure CMPPMsg_DealDeliver(SwquenceID: Integer; PMsg: PCMPPMsgBody_Deliver);
function CMPPMsg_SendActive: integer;
function CMPPMsg_SendActiveResp(SwquenceID: Integer): integer;
function CMPPMsg_SendExit: integer;
procedure CMPPMsg_DealExitResp();
public
constructor Create();
destructor Destroy;
procedure InitSys;
procedure UnInitSys;
procedure Execute; override;
//开始停止服务
function Start: Integer;
function Stop: Integer;
end;
TCMPPMsgSendThread = class(TThread)
public
FService: TCMPPService;
constructor Create(AService: TCMPPService);
destructor Destroy; override;
procedure Execute; override;
end;
TCMPPMsgWait = class
private
WaitEvent: TEvent;
FSwquenceID: integer;
MsgID: Int64;
Status: Integer;
public
constructor Create(ASwquenceID: integer);
destructor Destroy; override;
end;
var
CMPPService : TCMPPService;
implementation
uses Windows, winsock, StrUtils, Md5, dmParam;
function ntohll(number: Int64): Int64;
var
PNumber : PByteArray;
tmp : Byte;
begin
PNumber := @number;
tmp := PNumber[0];
PNumber[0] := PNumber[3];
PNumber[3] := tmp;
tmp := PNumber[1];
PNumber[1] := PNumber[2];
PNumber[2] := tmp;
Result := number;
end;
//开启业务
function CMPPLogin: Integer; stdcall;
begin
{ DONE : 开启服务 }
if Assigned(CMPPService) then
Result := CMPPService.Start
else
Result := -1;
end;
//停止业务
function CMPPLoginOut: Integer; stdcall;
begin
{ DONE : 结束服务 }
if Assigned(CMPPService) then
Result := CMPPService.Stop
else
Result := -1;
end;
//发送短信
function CMPPSendSingle(pSubmitInfo: PCMPPSubmit): Integer; stdcall;
begin
{ DONE :
添加到发送队列。增加一线程或者定时器对发送队列进行滑动窗口处理。处理完放到等待回复队列 }
if assigned(StrPos(@pSubmitInfo.sDestAddrs, ';')) then
begin
Result := CMPPService.AddSubmitMsgEx(pSubmitInfo)
end
else
begin
Result := CMPPService.AddSubmitMsg(pSubmitInfo)
end;
end;
//批量发送
//接受消息
function CMPPDeliver(const nTimeoutIn: Integer; pDeliverRespInfo: PCMPPDeliverResp): Integer; stdcall;
begin
{ DONE : 获取消息 }
Result := CMPPService.GetDeliverMsg(nTimeoutIn, pDeliverRespInfo)
end;
{ TCMPPService }
constructor TCMPPService.Create;
begin
inherited Create(False);
FreeOnTerminate := True;
end;
destructor TCMPPService.Destroy;
begin
end;
function TCMPPService.InitParam: Boolean;
begin
with Param do
begin
CMPPVersion := GetVaule(Param_Version, 48);
ICPId := GetVaule(Param_ICPId, '923039');
ICPShareKey := GetVaule(Param_ICPShareKey, '123456');
ServiceIP := GetVaule(Param_ServiceIP, '127.0.0.1');
ServicePort := GetVaule(Param_ServicePort, 7890);
SubmitRetry := GetVaule(Param_SubmitRetry, 3);
RequestTimeOut := GetVaule(Param_RequestTimeOut, 5);
RequestWindow := GetVaule(Param_RequestWindow, 16);
end;
end;
function TCMPPService.CMPPMsg_GetNewSeq: Integer;
begin
Inc(ClientSeqID);
Result := ClientSeqID;
end;
function TCMPPService.CMPPMsg_SendExit: integer;
var
Len : integer;
PMsg : PChar;
begin
{ DONE : 发送Exit消息 }
//组合消息
Len := CMPPMsg_SeTCMPPMsgHead(@CMPPMsg_Exit, CMD_TERMINATE, 0);
//发送消息
PMsg := @CMPPMsg_Exit;
Result := CMPPMsg_Send(PMsg^, Len);
end;
function TCMPPService.CMPPMsg_SendLogin: integer;
var
I: Integer;
AuthBytes : array[1..34] of byte;
Md5 : MD5Digest;
Len : integer;
PMsg : PChar;
StrTimeStamp : string;
begin
{ DONE : 发送Login消息 }
//组合消息
with CMPPMsg_Login.Body do
begin
StrPLCopy(@Source_Addr, ICPId, 6);
StrTimeStamp := FormatDateTime('MMDDHHNNSS', now);
TimeStamp := htonl(StrToInt(StrTimeStamp));
Version := CMPPVersion;
//计算数据
ZeroMemory(@AuthBytes, length(AuthBytes));
StrPLCopy(@AuthBytes, ICPId, 6);
Len := 6 + 9;
StrPLCopy(@AuthBytes[Len + 1], ICPShareKey, length(ICPShareKey));
Len := Len + length(ICPShareKey);
StrPLCopy(@AuthBytes[Len + 1], StrTimeStamp, 10);
Len := Len + 10;
Md5 := MD5Array(@AuthBytes, Len);
// for I := 0 to 15 do // Iterate
// begin
// CMPPMsg_Login.Body . AuthenticatorSource[1]:=char(108);//Pchar(integer(@md5)+i)^;
// end; // for
// MD5Digest(AuthenticatorSource) := Md5 ;
CopyMemory(@ CMPPMsg_Login.Body .AuthenticatorSource, Pchar(@Md5), 16);
end;
Len := CMPPMsg_SeTCMPPMsgHead(@CMPPMsg_Login.Head, CMD_CONNECT, sizeof(CMPPMsg_Login.Body));
//发送消息
PMsg := @CMPPMsg_Login;
Result := CMPPMsg_Send(PMsg^, Len);
end;
function TCMPPService.CMPPMsg_SeTCMPPMsgHead(pMsg: PCMPPMsgHead; MsgCode,
MsgLen: Integer): Integer;
begin
Result := MsgLen + sizeof(TCMPPMsgHead);
pMsg.PacketLength := htonl(Result);
pMsg.Command_Id := htonl(MsgCode);
pMsg.SwquenceID := htonl(CMPPMsg_GetNewSeq);
end;
function TCMPPService.CMPPMsg_Receive(var Buffer; RecLen: Integer): Boolean;
var
HadRecSize, recSize : integer;
Recaddr : pchar;
bErr : Boolean;
begin
Result := False;
HadRecSize := RecLen;
bErr := False;
try
while HadRecSize > 0 do
begin
Recaddr := Pchar(Integer(@buffer) + RecLen - HadRecSize);
csSocket.Enter;
try
recSize := CMPPClient.Socket.ReceiveBuf(Recaddr^, HadRecSize);
except
bErr := True;
end;
csSocket.Leave;
if bErr then
exit;
//如果未接收到数据则循环接收
if recSize < 0 then
begin
if RecLen = HadRecSize then
begin
//接受完整,退出
exit;
end
else
begin
//未接受完整.循环.继续等待
recSize := 0;
end;
end
else if recSize = 0 then
begin
//如果接收到的是空数据包则退出
exit;
end;
HadRecSize := HadRecSize - recsize;
end;
Result := True;
finally // wrap up
end; // try/finally
end;
procedure TCMPPService.ReConTimerDo(Sender: TObject);
begin
{ DONE : 重连定时器动作 }
//设置socket重新连接
SysLog.LogInfo('ReConTimerDo 重新启动连接 :' + BoolToStr(CMPPClient.Active, True));
CMPPClient.Active := True;
ReConTimer.Enabled := False;
end;
function TCMPPService.CMPPMsg_Send(var Buf; Size: Integer): Integer;
begin
result := 0;
//SysLog.LogInfo('CMPPMsg_Send发送数据:');
// SysLog.LogInfo(@Buf, Size);
csSocket.Enter;
try
if (CMPPClient.Socket.SendBuf(Buf, Size) = -1) then
begin
SysLog.LogInfo('CMPPMsg_Send发送数据出错');
Result := -1;
end
except
on e: Exception do
begin
SysLog.LogInfo('CMPPMsg_Send发送数据异常: ' + E.Message);
Result := -1;
end
end;
csSocket.Leave;
end;
function TCMPPService.SetServiceStatus(Status: integer): integer;
begin
ServiceStatus := Status;
case Status of
SS_WaitLoginResult:
begin
// 等待返回,一种是不能连接服务器,一种是Login失败
{ case ServiceEvent.WaitFor(RequestTimeOut * 1000) of //
wrSignaled:
begin
Result := ErrCode;
end;
wrTimeout:
begin
result := ICP_ERR_CONTIMEOUT;
end;
wrAbandoned:
begin
result := ICP_ERR_OTHER;
end;
wrError:
begin
result := ICP_ERR_OTHER;
end;
end; // case }
Result := 0;
end;
SS_WaitLogOutResult:
begin
//发送Logout消息
if CMPPMsg_SendExit <> -1 then
begin
{case ServiceEvent.WaitFor(RequestTimeOut * 1000) of //
wrSignaled:
begin
Result := ErrCode;
end;
wrTimeout:
begin
result := ICP_ERR_DISCONTIMEOUT;
end;
wrAbandoned:
begin
result := ICP_ERR_OTHER;
end;
wrError:
begin
result := ICP_ERR_OTHER;
end;
end; }
Result := 0;
end
else
begin
Result := ICP_ERR_DISCONSENDERR;
end;
end;
end;
end;
procedure TCMPPService.CMPP_CanWrite(Sender: TObject;
Socket: TCustomWinSocket);
begin
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -