📄 cngp.c
字号:
strncpy(CommandPack,SyncPack,5);
}
else if(!strncmp(SyncPack,"SP_DZJG",7)) //SP_DZJG
{
strncpy(CommandPack,SyncPack,7);
}
else if(!strncmp(SyncPack,"SP_DZQQ I",9)) //SP_DZQQ
{
strncpy(CommandPack,SyncPack,7);
}
else if(!strncmp(SyncPack,"SP_DZQQJG",9)) //SP_DZQQJG
{
strncpy(CommandPack,SyncPack,9);
}
else if(!strncmp(SyncPack,"SP_QX I",7)) //SP_QX
{
strncpy(CommandPack,SyncPack,5);
}
else if(!strncmp(SyncPack,"SP_QXJG",7)) //SP_QXJG
{
strncpy(CommandPack,SyncPack,7);
}
}
return 1;
}
else
{
INFO( "[INFO]: not the sync pack!\n");
return 0;
}
}
/****把长数值费率变为短数值费率******/
int change_feecode(char **feecode)
{
int feecode_length = 0;
int i = 1;
feecode_length = strlen(*feecode);
for(i=1; **feecode == '0' && i < feecode_length; i++, (*feecode)++);
return 0;
}
int HaveDataTimeout(int socket_fd , unsigned int utimeout)
{
fd_set recvset;
struct timeval timeout;
int ret;
/* Select Receive Set */
FD_ZERO(&recvset);
FD_SET(socket_fd, &recvset);
timeout.tv_sec = 0;
timeout.tv_usec = utimeout;
again:
if ( (ret = select(socket_fd + 1, &recvset, 0, 0, &timeout)) == -1 )
{
if(errno==EINTR)
{
goto again;
}
return -1;
}
else if(ret == 0)
{
//[HaveData]: time out
return 0;
}
//HAVE DATA
return ret;
}
void *Recv_Mo(int *argu)
{
int n=0,ii=0;
int ret = 0;
int Rsocket_fd=0, recv_len= 0;
unsigned long Command_ID=0;
pthread_t threadsnum=0;
unsigned long Body_Length=0;
int cn= 0;
struct timespec recvmotime;
pthread_mutex_t recvmolock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t recvmocond = PTHREAD_COND_INITIALIZER;
char *socketbuffer = NULL;
int buffersize=0;
int recvmoport =0, recvmoversion =0;
unsigned long Sequence_Num=1;
unsigned long SUB_Sequence_Num=0;
int return_id =0;
char Msg_Content[500]="", query_string[1024]="",sqlbuf[1024]="";
char Version;
int loginstatus =0;
MYSQL fsql_mo;
int times=0, timeend=0;
int recvmomode = 0;
unsigned long Command_Status =0;
char Msg_ID[30]="", Recv_Time[25] ="",Src_Term_ID[32] ="", Dest_Term_ID[32] ="", tmp[500]="";
char ContentPackType[20]="",IsmpPackType[20]="",msgid[30]="";
int Is_Report=0,Msg_Length=0;
TLV Congestion_State = { 0x0428, 1, 35 };
char RerverseId[15]="",ErrorCode[32]="";
threadsnum = pthread_self();
pthread_detach(threadsnum);
mysql_thread_init();
recvmoport = argu[0]; // Port
cn = argu[1]; // 连接号
recvmoversion = argu[2]; // Version
recvmomode = argu[3]; // mode
times = time(NULL);
if(!mysql_init(&fsql_mo))
{
INFO("[-ERR:RECVMO:%u] init error \n", threadsnum);
exit(EXIT_FAILURE);
}
if (!mysql_real_connect(&fsql_mo, send_q_host, send_q_user, send_q_pass, send_q_db, send_q_port, NULL, 0) )
{
INFO("[-ERR:RECVMO:%u] RECV MO sql_connect: %s\n", threadsnum, mysql_error(&fsql_mo));
exit(EXIT_FAILURE);
}
INFO("[INFO-RECVMO:%u] RECVMO Thread START\n", threadsnum);
socketbuffer = malloc(socketbuffersize);
if(socketbuffer == NULL)
{
INFO("[INFO-RECVMO:%u] malloc error .\n", threadsnum);
socketbuffer = malloc(socketbuffersize);
}
buffersize = 0;
INFO("[INFO-RECVMO:%u]: Recv thread start!\n", threadsnum);
while(Rsocket_fd <=0){
Rsocket_fd = ConnectISMG(CNGP_IP, recvmoport, LOGIN_NAME, LOGIN_PASSWORD, Sequence_Num, recvmomode, recvmoversion, threadsnum);
INFO("[INFO-RECVMO:%u] ConnectISMG socket :%d\n", threadsnum, Rsocket_fd);
if(Rsocket_fd <= 0){
recvmotime.tv_sec = time(NULL)+5;
recvmotime.tv_nsec = 0;
pthread_mutex_lock(&recvmolock);
ret = pthread_cond_timedwait(&recvmocond, &recvmolock, (const struct timespec *)&recvmotime);
if(ret !=0 )
{
INFO("[INFO-RECVMO:%u]: Connect to ISMG Error , wait 5 Sec .\n", threadsnum);
pthread_mutex_unlock(&recvmolock);
}
}
else
loginstatus = 1;
}
while(1)
{
if( HaveDataTimeout(Rsocket_fd, timeoutu) > 0 )
{
// have data to receive
if( buffersize < socketbuffersize)
{
recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
if(recv_len <= 0)
{
// reconnect
INFO("[INFO-RECVMO:%u] recv thread cannot receive data from socket, reconnect. recv_len:%d\n",threadsnum, recv_len);
if( Sequence_Num == 0xffffffff)
Sequence_Num = 1;
else
Sequence_Num ++;
CNGP_Exit(Rsocket_fd, Sequence_Num);
close(Rsocket_fd);
loginstatus = 0;
INFO("[INFO-RECVMO:%u] Buffersize == %d\n", threadsnum, buffersize);
if( 0 == buffersize )
{
INFO("[INFO-RECVMO:%u] Buffersize ==0 , reconnect\n", threadsnum);
Sequence_Num =1;
Rsocket_fd = ConnectISMG(CNGP_IP, recvmoport, LOGIN_NAME, LOGIN_PASSWORD, Sequence_Num, recvmomode, recvmoversion, threadsnum);
}
}
else
{
times = time(NULL);
INFO("[INFO-RECVMO:%u] RECV DATA %d\n", threadsnum, recv_len);
buffersize += recv_len;
}
}
}
while( buffersize >= LEN_CNGP_HEADER)
{
n = CNGP_HEADER_Recv_mem(socketbuffer, &Body_Length, &Command_ID, &Command_Status, &SUB_Sequence_Num);
buffersize = buffersize - LEN_CNGP_HEADER;
INFO("[INFO-RECVMO:%u] buffsersize:%d Command_ID:%08x Body_Length:%d SUB_Sequence_Num:%u\n", threadsnum, buffersize, Command_ID, Body_Length, SUB_Sequence_Num);
memmove(socketbuffer , socketbuffer+LEN_CNGP_HEADER, buffersize);
if( (buffersize < Body_Length) && (loginstatus ==2) )
{
recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
if(recv_len> 0){
buffersize += recv_len;
times = time(NULL);
}
}
if(Command_ID == ID_CNGP_ACTIVE_TEST_RESP)
{
// ACTIVE_TEST_RESP
INFO("[INFO-RECVMO:%u] Receive Active_Test_Resp .\n", threadsnum );
}
else if(Command_ID == ID_CNGP_ACTIVE_TEST)
{
// ACTIVE_TEST
INFO("[INFO-RECVMO:%u] Receive Active_Test .\n", threadsnum );
CNGP_Active_TestRes(Rsocket_fd, SUB_Sequence_Num);
INFO("[INFO-RECVMO:%u] Send Active_Test_Resp.\n", threadsnum );
}
else if ( Command_ID == ID_CNGP_LOGIN_RESP)
{
CNGP_Receive_LoginRes_mem(socketbuffer, Body_Length, &Version);
buffersize = buffersize - Body_Length;
memmove(socketbuffer, socketbuffer+Body_Length, buffersize);
if( 0 == Command_Status)
{
INFO("[INFO-RECVMO:%u] Connect SMG Success, Recv_Version=%d ,change login status. \n", threadsnum, Version);
loginstatus = 2;
}
else
{
INFO("[INFO-RECVMO:%u] login error . retry .\n", threadsnum);
close(Rsocket_fd);
loginstatus = 0;
}
}
else if(Command_ID == ID_CNGP_EXIT)
{
CNGP_Exit_Resp(Rsocket_fd, SUB_Sequence_Num);
INFO("[INFO-RECVMO:%u] Recv Exit command .\n", threadsnum);
close(Rsocket_fd);
loginstatus = 0;
}
else if(Command_ID == ID_CNGP_DELIVER)
{
memset (Msg_ID, 0, 10);
memset (Recv_Time, 0, 20);
memset (Src_Term_ID, 0, 21);
memset (Dest_Term_ID, 0, 21);
memset (Msg_Content, 0, sizeof(Msg_Content) );
memset (tmp, 0, 500);
memset (ContentPackType,0,16);
memset (IsmpPackType,0,16);
CNGP_Receive_Deliver_mem( socketbuffer, Body_Length, Msg_ID, &Is_Report, Recv_Time, Src_Term_ID, Dest_Term_ID, &Msg_Length, Msg_Content);
buffersize = buffersize - Body_Length;
if( buffersize < 0)
{
recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
if( recv_len > 0)
buffersize += recv_len;
else{
close(Rsocket_fd);
buffersize = 0;
pthread_mutex_lock(&cnum_lock[cn]);
threads[cn][1] = 0; // login status
threads[cn][2] = 0; // need submit resp
threads[cn][0] = 0; // socket
pthread_mutex_unlock(&cnum_lock[cn]);
INFO("[INFO-RECVMO:%u] Reset login status unlock\n", threadsnum);
CNGP_Exit(Rsocket_fd, SUB_Sequence_Num);
close(Rsocket_fd);
//模拟 response
INFO("[INFO-RECVMO:%u] simulate resp lock\n", threadsnum );
pthread_mutex_lock(&sendq_lock[cn]);
pthread_mutex_lock(&resq_lock[cn]);
n = simulateres(SENDQ[cn], RESQ[cn]);
pthread_mutex_unlock(&resq_lock[cn]);
pthread_mutex_unlock(&sendq_lock[cn]);
INFO("[INFO-RECVMO:%u] simulate resp unlock simulate:%d\n", threadsnum ,n);
break;
}
}
memmove(socketbuffer, socketbuffer + Body_Length , buffersize);
parse_msgid (Msg_ID, msgid);
INFO("[INFO-RECVMO:%u] Is_Report=%d,Recv_Time=%s,Src_Term_ID=%s,Dest_Term_ID=%s,Msg_Length=%d,msgid=%s\n",
threadsnum, Is_Report, Recv_Time, Src_Term_ID, Dest_Term_ID, Msg_Length, msgid);
return_id = CNGP_DeliverRes (Rsocket_fd, SUB_Sequence_Num, Msg_ID, Congestion_State);
if(Is_Report ==0)
{
INFO("[INFO-RECVMO:%u] msg_content:%s\n", threadsnum, Msg_Content);
if(strcmp(Src_Term_ID, SSMP_SERVICE_CODE) == 0)
{
//SSMP 管理平台数据
mysql_escape_string(sqlbuf, Msg_Content, strlen(Msg_Content));
sprintf(query_string, "insert into %s.recv_q (rq_id,rq_mobile,rq_content,rq_date,rq_gateway,rq_provider,rq_linkid,rq_from,rq_status,rq_bak) values(NULL,'%s','%s',now(),'%s','0','','5','','')", send_q_db, Src_Term_ID, sqlbuf, Dest_Term_ID);
INFO("[INFO-RECVMO:%u] ssmp: %s\n", threadsnum, query_string);
ii = mysql_real_query (&fsql_mo, query_string, strlen(query_string));
if(ii !=0 )
{
INFO("[INFO-RECVMO:%u] ERROR:mysql_real_query :%s\n",threadsnum , mysql_error(&fsql_mo));
}
else
INFO("[INFO-RECVMO:%u] INSERT:mysql_affected_rows:%d\n", threadsnum, mysql_affected_rows(&fsql_mo) );
}
else if(!strncmp(Msg_Content,"SP_DBQQJG",9) || !strncmp(Msg_Content,"SP_DBJG",7) ||
!strncmp(Msg_Content,"SP_DZQQJG",9) || !strncmp(Msg_Content,"SP_DZJG",7) ||
!strncmp(Msg_Content,"SP_QXJG",7))
{
//ISMP, 反向业务mo,插入到report_q生成人工状态报告
memset(RerverseId,0,15);
memset(ErrorCode,0,32);
Ismp_Cngp_Syncpack_Parse(Msg_Content,IsmpPackType,
NULL,NULL,NULL,NULL,NULL,RerverseId,ErrorCode,NULL);
sprintf (query_string, "insert into %s.report_q (rq_id, rq_sequence,rq_mobile, rq_state, rq_code, rq_date, rq_provider, rq_flag) values(NULL,'%s','%s','%d','%s',now(),'1','0')", send_q_db, RerverseId, IsmpPackType, 0, ErrorCode);
ii = mysql_real_query (&fsql_mo, query_string, strlen (query_string));
INFO("[INFO-RECVMO:%u] sp fanxiang: %s\n", threadsnum, query_string);
if(ii !=0)
{
INFO("[INFO-RECVMO:%u] ERROR:mysql_real_query :%s\n",threadsnum , mysql_error(&fsql_mo));
}
else
INFO("[INFO-RECVMO:%u] INSERT:mysql_affected_rows:%d\n", threadsnum, mysql_affected_rows(&fsql_mo) );
}
else
{
//普通mo系列,插入到recv_q
if (!strncmp(Msg_Content,"SP_ZXQX",7))
{
strcpy(ContentPackType,"8"); //8正向取消(0000)
}
else
{
strcpy(ContentPackType,"0");
}
if ( (RECV_DEL106_FLAG==1) && (Dest_Term_ID[0] == '1' && Dest_Term_ID[1] == '0'&& Dest_Term_ID[2] == '6') )
{
strcpy (tmp, Dest_Term_ID + 3);
memset (Dest_Term_ID, 0, 21);
strcpy (Dest_Term_ID, tmp);
memset (tmp, 0, 500);
}
INFO("[INFO-RECV:2 deliver.DestTermID = %s\n", Dest_Term_ID);
sprintf (query_string, "insert into %s.recv_q (rq_id,rq_mobile,rq_content,rq_date,rq_gateway,rq_provider,rq_linkid,rq_from,rq_status,rq_bak) values(NULL,'", send_q_db);
strcat (query_string, Src_Term_ID);
strcat (query_string, "','");
mysql_escape_string(sqlbuf, Msg_Content, strlen(Msg_Content));
strcat (query_string, sqlbuf);
strcat (query_string, "',now(),'");
if (strlen (Dest_Term_ID) > 20)
{
Dest_Term_ID[20] = '\0';
}
strcat (query_string, Dest_Term_ID);
strcat (query_string, "','0', '', '");
strcat (query_string,ContentPackType);
strcat (query_string,"', '', '')");
INFO("[INFO-RECVMO:%u] Receive MO sql: %s\n",threadsnum,query_string);
ii = mysql_real_query (&fsql_mo, query_string, strlen (query_string));
if(ii !=0)
{
INFO("[INFO-RECVMO:%u] ERROR:mysql_real_query :%s\n",threadsnum , mysql_error(&fsql_mo));
}
else
INFO("[INFO-RECVMO:%u] INSERT:mysql_affected_rows:%d\n", threadsnum, mysql_affected_rows(&fsql_mo) );
strcpy (Msg_Content, "");
memset (Dest_Term_ID, 0, 21);
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -