⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cngp.c

📁 增值业务的sngp协议。
💻 C
📖 第 1 页 / 共 5 页
字号:
            			strncpy(CommandPack,SyncPack,5);
            		}  
            		else if(!strncmp(SyncPack,"SP_DZJG",7))                 //SP_DZJG
            		{
            			strncpy(CommandPack,SyncPack,7);
            		}
            		else if(!strncmp(SyncPack,"SP_DZQQ I",9))                 //SP_DZQQ
            		{
                		strncpy(CommandPack,SyncPack,7);
            		}
            		else if(!strncmp(SyncPack,"SP_DZQQJG",9))                 //SP_DZQQJG
            		{
                		strncpy(CommandPack,SyncPack,9);
            		}
            		else if(!strncmp(SyncPack,"SP_QX I",7))                   //SP_QX
            		{
                		strncpy(CommandPack,SyncPack,5);
            		}
            		else if(!strncmp(SyncPack,"SP_QXJG",7))                   //SP_QXJG
            		{
                		strncpy(CommandPack,SyncPack,7);                        
            		}
		} 
		return 1;  
	}
    	else
    	{
        	INFO( "[INFO]: not the sync pack!\n");
    		return 0;
    	}
}


/****把长数值费率变为短数值费率******/
int change_feecode(char **feecode)
{
	int feecode_length = 0;
	int i = 1;
	
	feecode_length = strlen(*feecode);
	for(i=1; **feecode == '0' && i < feecode_length; i++, (*feecode)++);
	
	return 0;
}

int HaveDataTimeout(int socket_fd , unsigned int utimeout)
{
	fd_set recvset;
	struct timeval  timeout;
	int ret;
	
	/* Select Receive Set */
	FD_ZERO(&recvset);
	FD_SET(socket_fd, &recvset);
	timeout.tv_sec = 0;
	timeout.tv_usec = utimeout;
	again:
	if ( (ret = select(socket_fd + 1, &recvset, 0, 0, &timeout)) == -1 )
	{	
		if(errno==EINTR) 
		{	
			goto again;
		}
		return -1;
	}
	else if(ret == 0)
	{
		//[HaveData]: time out
		return 0;
	}
	//HAVE DATA
	return ret;
	
}
void *Recv_Mo(int *argu)
{
	int n=0,ii=0;
    	int ret = 0;
    	int Rsocket_fd=0, recv_len= 0; 
    	unsigned long Command_ID=0;
    	pthread_t threadsnum=0;
    	unsigned long Body_Length=0;
    	int cn= 0;
    	struct timespec recvmotime;
	pthread_mutex_t recvmolock = PTHREAD_MUTEX_INITIALIZER;
   	pthread_cond_t  recvmocond = PTHREAD_COND_INITIALIZER;
	char *socketbuffer = NULL;
	int buffersize=0;
	int recvmoport =0, recvmoversion =0;
	unsigned long  Sequence_Num=1;
	unsigned long SUB_Sequence_Num=0;
	int return_id =0;
	char Msg_Content[500]="", query_string[1024]="",sqlbuf[1024]="";
	char Version;
	int loginstatus  =0;
	MYSQL fsql_mo;
	int times=0, timeend=0;
	int recvmomode = 0;
	unsigned long Command_Status =0;
	char Msg_ID[30]="", Recv_Time[25] ="",Src_Term_ID[32] ="", Dest_Term_ID[32] ="", tmp[500]="";
	char ContentPackType[20]="",IsmpPackType[20]="",msgid[30]="";
	int Is_Report=0,Msg_Length=0;
	TLV Congestion_State = { 0x0428, 1, 35 };
	char RerverseId[15]="",ErrorCode[32]="";
	
    	threadsnum = pthread_self();
    	pthread_detach(threadsnum);
	mysql_thread_init();
	
    	recvmoport = argu[0]; // Port 
    	cn = argu[1]; // 连接号
    	recvmoversion = argu[2]; // Version
    	recvmomode = argu[3]; // mode 
    	times = time(NULL);
	if(!mysql_init(&fsql_mo))
	{
		INFO("[-ERR:RECVMO:%u] init error \n", threadsnum);
		exit(EXIT_FAILURE);
    	}
    	if (!mysql_real_connect(&fsql_mo, send_q_host, send_q_user, send_q_pass, send_q_db, send_q_port, NULL, 0) ) 
	{
        	INFO("[-ERR:RECVMO:%u] RECV MO sql_connect: %s\n", threadsnum, mysql_error(&fsql_mo));
        	exit(EXIT_FAILURE);
    	}
		
	INFO("[INFO-RECVMO:%u] RECVMO Thread START\n", threadsnum);
	
	socketbuffer = malloc(socketbuffersize);
	if(socketbuffer == NULL)
	{
		INFO("[INFO-RECVMO:%u] malloc error .\n", threadsnum);
		socketbuffer = malloc(socketbuffersize);
	}
	buffersize = 0;
    	INFO("[INFO-RECVMO:%u]: Recv thread start!\n", threadsnum);
	
	while(Rsocket_fd <=0){
		Rsocket_fd = ConnectISMG(CNGP_IP, recvmoport, LOGIN_NAME, LOGIN_PASSWORD, Sequence_Num, recvmomode, recvmoversion, threadsnum);
		INFO("[INFO-RECVMO:%u] ConnectISMG socket :%d\n", threadsnum, Rsocket_fd);
		
		if(Rsocket_fd <= 0){
	  		recvmotime.tv_sec  =  time(NULL)+5;
	              recvmotime.tv_nsec  =  0;
			pthread_mutex_lock(&recvmolock);
			ret  =  pthread_cond_timedwait(&recvmocond,  &recvmolock, (const  struct  timespec  *)&recvmotime);
	 		if(ret !=0 )
			{
				INFO("[INFO-RECVMO:%u]:  Connect to ISMG Error , wait 5 Sec  .\n", threadsnum);
				pthread_mutex_unlock(&recvmolock);
			}
		}
		else
			loginstatus = 1;
	}
	
    	while(1)
	{
		if( HaveDataTimeout(Rsocket_fd, timeoutu) > 0 )
		{
			// have data to receive
		    	if( buffersize < socketbuffersize)
		    	{
				recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
				if(recv_len <= 0)
				{
					// reconnect
					INFO("[INFO-RECVMO:%u] recv thread cannot receive data from socket, reconnect. recv_len:%d\n",threadsnum, recv_len);
					if( Sequence_Num == 0xffffffff)
						Sequence_Num = 1;
					else
						Sequence_Num ++;
					CNGP_Exit(Rsocket_fd, Sequence_Num);
					close(Rsocket_fd);
					loginstatus = 0;
					INFO("[INFO-RECVMO:%u] Buffersize == %d\n", threadsnum, buffersize);
					if( 0 == buffersize )
					{
						INFO("[INFO-RECVMO:%u] Buffersize ==0 , reconnect\n", threadsnum);
						Sequence_Num =1;
						Rsocket_fd = ConnectISMG(CNGP_IP, recvmoport, LOGIN_NAME, LOGIN_PASSWORD, Sequence_Num, recvmomode, recvmoversion, threadsnum);
					}
				}
				else
				{
					times = time(NULL);
					INFO("[INFO-RECVMO:%u] RECV DATA %d\n", threadsnum, recv_len);
					buffersize += recv_len;
				}
		    	}
		}
		
		while( buffersize >= LEN_CNGP_HEADER)
		{
			n = CNGP_HEADER_Recv_mem(socketbuffer, &Body_Length, &Command_ID, &Command_Status, &SUB_Sequence_Num);
			buffersize = buffersize - LEN_CNGP_HEADER;
			INFO("[INFO-RECVMO:%u] buffsersize:%d Command_ID:%08x Body_Length:%d SUB_Sequence_Num:%u\n", threadsnum, buffersize, Command_ID, Body_Length, SUB_Sequence_Num);
			memmove(socketbuffer , socketbuffer+LEN_CNGP_HEADER, buffersize);
			if( (buffersize < Body_Length) && (loginstatus ==2) )
			{
				recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
				if(recv_len> 0){
					buffersize += recv_len; 
					times = time(NULL);
				}
			}
			
			if(Command_ID == ID_CNGP_ACTIVE_TEST_RESP)
			{ 
				// ACTIVE_TEST_RESP 
				INFO("[INFO-RECVMO:%u] Receive Active_Test_Resp .\n", threadsnum );
			}
			else if(Command_ID == ID_CNGP_ACTIVE_TEST)
			{ 
				// ACTIVE_TEST
				INFO("[INFO-RECVMO:%u] Receive Active_Test .\n", threadsnum );
				CNGP_Active_TestRes(Rsocket_fd, SUB_Sequence_Num);
				INFO("[INFO-RECVMO:%u] Send Active_Test_Resp.\n", threadsnum );
			}
			else if ( Command_ID == ID_CNGP_LOGIN_RESP)
			{
				CNGP_Receive_LoginRes_mem(socketbuffer, Body_Length, &Version);
				buffersize = buffersize - Body_Length;
				memmove(socketbuffer, socketbuffer+Body_Length, buffersize);
				if( 0 == Command_Status)
				{
					INFO("[INFO-RECVMO:%u] Connect SMG Success, Recv_Version=%d ,change login status. \n", threadsnum, Version);
					loginstatus = 2;
				}
				else
				{	
					INFO("[INFO-RECVMO:%u] login error . retry .\n", threadsnum);
					close(Rsocket_fd);
					loginstatus = 0;
				}
			}
			else if(Command_ID == ID_CNGP_EXIT)
			{
				CNGP_Exit_Resp(Rsocket_fd, SUB_Sequence_Num);
				INFO("[INFO-RECVMO:%u] Recv Exit command .\n", threadsnum);
				close(Rsocket_fd);
				loginstatus = 0;
			}
			else if(Command_ID == ID_CNGP_DELIVER)
			{
				memset (Msg_ID, 0, 10);
				memset (Recv_Time, 0, 20);
				memset (Src_Term_ID, 0, 21);
				memset (Dest_Term_ID, 0, 21);
				memset (Msg_Content, 0, sizeof(Msg_Content) );
				memset (tmp, 0, 500);
				memset (ContentPackType,0,16);
				memset (IsmpPackType,0,16);
				
				CNGP_Receive_Deliver_mem( socketbuffer, Body_Length, Msg_ID, &Is_Report, Recv_Time, Src_Term_ID, Dest_Term_ID, &Msg_Length, Msg_Content);
				buffersize = buffersize - Body_Length;
				if( buffersize < 0)
				{
					recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
					if( recv_len > 0)
						buffersize += recv_len;
					else{
						close(Rsocket_fd);
						buffersize = 0;
						pthread_mutex_lock(&cnum_lock[cn]);
						threads[cn][1] = 0; // login status 
						threads[cn][2] = 0; // need submit resp
						threads[cn][0] = 0; // socket
						pthread_mutex_unlock(&cnum_lock[cn]);
						INFO("[INFO-RECVMO:%u] Reset login status unlock\n", threadsnum);
						CNGP_Exit(Rsocket_fd, SUB_Sequence_Num);
						close(Rsocket_fd);
						//模拟 response
						INFO("[INFO-RECVMO:%u] simulate resp lock\n", threadsnum );
						pthread_mutex_lock(&sendq_lock[cn]);
						pthread_mutex_lock(&resq_lock[cn]);
						n = simulateres(SENDQ[cn], RESQ[cn]);
						pthread_mutex_unlock(&resq_lock[cn]);
						pthread_mutex_unlock(&sendq_lock[cn]);
						INFO("[INFO-RECVMO:%u] simulate resp unlock simulate:%d\n", threadsnum ,n);
						break;
					}
				}
				memmove(socketbuffer, socketbuffer + Body_Length , buffersize);
				parse_msgid (Msg_ID, msgid);
				INFO("[INFO-RECVMO:%u] Is_Report=%d,Recv_Time=%s,Src_Term_ID=%s,Dest_Term_ID=%s,Msg_Length=%d,msgid=%s\n",
					threadsnum, Is_Report, Recv_Time, Src_Term_ID, Dest_Term_ID, Msg_Length, msgid);
				return_id = CNGP_DeliverRes (Rsocket_fd, SUB_Sequence_Num, Msg_ID, Congestion_State);

				if(Is_Report ==0)
				{
					INFO("[INFO-RECVMO:%u] msg_content:%s\n", threadsnum, Msg_Content);
					if(strcmp(Src_Term_ID, SSMP_SERVICE_CODE) == 0)
					{	
						  //SSMP 管理平台数据
						mysql_escape_string(sqlbuf, Msg_Content, strlen(Msg_Content));
				              sprintf(query_string, "insert into %s.recv_q (rq_id,rq_mobile,rq_content,rq_date,rq_gateway,rq_provider,rq_linkid,rq_from,rq_status,rq_bak) values(NULL,'%s','%s',now(),'%s','0','','5','','')", send_q_db, Src_Term_ID, sqlbuf, Dest_Term_ID);
						INFO("[INFO-RECVMO:%u] ssmp: %s\n", threadsnum, query_string);
				              ii = mysql_real_query (&fsql_mo, query_string, strlen(query_string));
						if(ii !=0 )
						{
							INFO("[INFO-RECVMO:%u] ERROR:mysql_real_query :%s\n",threadsnum , mysql_error(&fsql_mo));
						}
						else
							INFO("[INFO-RECVMO:%u] INSERT:mysql_affected_rows:%d\n", threadsnum, mysql_affected_rows(&fsql_mo) );
				       }
					else if(!strncmp(Msg_Content,"SP_DBQQJG",9) || !strncmp(Msg_Content,"SP_DBJG",7) ||
						!strncmp(Msg_Content,"SP_DZQQJG",9) || !strncmp(Msg_Content,"SP_DZJG",7) ||
						!strncmp(Msg_Content,"SP_QXJG",7))
					{
						//ISMP, 反向业务mo,插入到report_q生成人工状态报告
					      	memset(RerverseId,0,15);
					      	memset(ErrorCode,0,32);
					      	Ismp_Cngp_Syncpack_Parse(Msg_Content,IsmpPackType,
					      	          NULL,NULL,NULL,NULL,NULL,RerverseId,ErrorCode,NULL);
					       sprintf (query_string, "insert into %s.report_q (rq_id, rq_sequence,rq_mobile, rq_state, rq_code, rq_date, rq_provider, rq_flag) values(NULL,'%s','%s','%d','%s',now(),'1','0')", send_q_db, RerverseId, IsmpPackType, 0, ErrorCode);
					       ii = mysql_real_query (&fsql_mo, query_string, strlen (query_string));
					       INFO("[INFO-RECVMO:%u] sp fanxiang: %s\n", threadsnum, query_string);
						if(ii !=0)
						{
							INFO("[INFO-RECVMO:%u] ERROR:mysql_real_query :%s\n",threadsnum , mysql_error(&fsql_mo));
						}
						else
							INFO("[INFO-RECVMO:%u] INSERT:mysql_affected_rows:%d\n", threadsnum, mysql_affected_rows(&fsql_mo) );
					}
					else
					{
						//普通mo系列,插入到recv_q
						if (!strncmp(Msg_Content,"SP_ZXQX",7))
	          				{
	      	      					strcpy(ContentPackType,"8");        //8正向取消(0000)
	          				}
	          				else
	          				{
	      	      					strcpy(ContentPackType,"0");
	          				}
	          				
	          				if ( (RECV_DEL106_FLAG==1) && (Dest_Term_ID[0] == '1' && Dest_Term_ID[1] == '0'&& Dest_Term_ID[2] == '6') )	          				
		  				{
		      					strcpy (tmp, Dest_Term_ID + 3);
		      					memset (Dest_Term_ID, 0, 21);
		      					strcpy (Dest_Term_ID, tmp);
		      					memset (tmp, 0, 500);
		  				}
						INFO("[INFO-RECV:2 deliver.DestTermID = %s\n", Dest_Term_ID);
									
	          				sprintf (query_string, "insert into %s.recv_q (rq_id,rq_mobile,rq_content,rq_date,rq_gateway,rq_provider,rq_linkid,rq_from,rq_status,rq_bak) values(NULL,'", send_q_db);
	          				strcat (query_string, Src_Term_ID);
	          				strcat (query_string, "','");
						mysql_escape_string(sqlbuf, Msg_Content, strlen(Msg_Content));
	          				strcat (query_string, sqlbuf);
	          				strcat (query_string, "',now(),'");
	          				if (strlen (Dest_Term_ID) > 20)
		  				{
		      					Dest_Term_ID[20] = '\0';
		  				}
	          				strcat (query_string, Dest_Term_ID);
	          				strcat (query_string, "','0', '', '");
	          				strcat (query_string,ContentPackType);
	          				strcat (query_string,"', '', '')");
	          				INFO("[INFO-RECVMO:%u] Receive MO sql: %s\n",threadsnum,query_string);
	          				ii = mysql_real_query (&fsql_mo, query_string, strlen (query_string));
						if(ii !=0)
						{
							INFO("[INFO-RECVMO:%u] ERROR:mysql_real_query :%s\n",threadsnum , mysql_error(&fsql_mo));
						}
						else
							INFO("[INFO-RECVMO:%u] INSERT:mysql_affected_rows:%d\n", threadsnum, mysql_affected_rows(&fsql_mo) );
						strcpy (Msg_Content, "");
						memset (Dest_Term_ID, 0, 21);
	      				}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -