⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 cngp.c

📁 增值业务的sngp协议。
💻 C
📖 第 1 页 / 共 5 页
字号:
				}
				else if(Is_Report == 1)
				{
					char err[4]="";
					int status =0;
					
					parse_msgid (Msg_Content + 3, msgid);
					
					//memset( msgid ,0, sizeof(msgid));				       
				        //strncpy( msgid,Msg_Content+3,20);
				       
				       err[0] = *(Msg_Content + 93);	//93 err code
				       err[1] = *(Msg_Content + 94);	//93 err code
				       err[2] = *(Msg_Content + 95);	//93 err code
				       err[3] = '\0';
				       
				       INFO("Msg_Content=%s\n", Msg_Content );	// 14 = strlen("id:") + msgid length+ space
				       INFO("Msg_msgid=%s\n", msgid );	
				       				      
				       INFO("err = %s\n", err);	
				       
				       if (Command_Status != 0)
						status = 2;
				       sprintf (query_string, "insert into %s.report_q (rq_id, rq_sequence,rq_mobile, rq_state, rq_code, rq_date, rq_provider, rq_flag) values(NULL,'%s','%s','%d','%s',now(),'0')", send_q_db, msgid, Src_Term_ID, status, err);	//Src_Term_ID is send cellphone code,
				       INFO("[INFO-RECVMO:%u] Receive report sql: %s\n",query_string,threadsnum);
				       ii = mysql_real_query (&fsql_mo, query_string, strlen (query_string)); 
					if(ii !=0)
					{
						INFO("[INFO-RECVMO:%u] ERROR:mysql_real_query :%s\n",threadsnum , mysql_error(&fsql_mo));
					}
					else
						INFO("[INFO-RECVMO:%u] INSERT:mysql_affected_rows:%d\n", threadsnum, mysql_affected_rows(&fsql_mo) );
				}
				else{
					 // wrong type 
					 INFO("[INFO-RECVMO:%u] Error Command_ID=>Command_ID=%8x\n", threadsnum, Command_ID);
				}
			}
			else
			{
				// other data
				INFO("[INFO-RECVMO:%u] Recv unknown data, Command_ID:%08x ,Body_Length :%d \n", threadsnum, Command_ID, Body_Length);
				if(Body_Length >0)
				{	
					buffersize -= Body_Length;
					memmove( socketbuffer, socketbuffer + Body_Length , buffersize );
				}
			}
		}
		
	    	while( loginstatus == 0 )
	    	{
	    		Rsocket_fd = ConnectISMG(CNGP_IP, recvmoport, LOGIN_NAME, LOGIN_PASSWORD, Sequence_Num, recvmomode, recvmoversion, threadsnum);
			INFO("[INFO-RECVMO:%u] ConnectISMG socket :%d\n", threadsnum, Rsocket_fd);
			if( Rsocket_fd > 0 )
			{
				loginstatus = 0;
			}
			else
			{
		  		recvmotime.tv_sec  =  time(NULL)+5;
		              recvmotime.tv_nsec  =  0;
				pthread_mutex_lock(&recvmolock);
				ret  =  pthread_cond_timedwait( &recvmocond,  &recvmolock, (const  struct  timespec  *)&recvmotime );
		 		if(ret !=0 )
				{
					INFO("[INFO-RECVMO:%u]: Connect ISMG failed , wait 5 sec  .\n", threadsnum);
					pthread_mutex_unlock(&recvmolock);
				}
			}
	    	}

		timeend = time(NULL);
		if( loginstatus == 2 && (timeend-times) >= active_time)
		{
			if( Sequence_Num == 0xffffffff)
				Sequence_Num = 1;
			else
				Sequence_Num ++;
			ret= CNGP_Active_Test(Rsocket_fd, Sequence_Num);
			INFO("[INFO-RECVMO:%u] Active test send %d\n", threadsnum, ret);
			times = time(NULL);
		}
	}
}

void *Recv_Res(int *argu)
{
	int n=0,ii=0;
    	int ret = 0;
    	int datatmp=0;
    	int Rsocket_fd=0, recv_len= 0; 
    	unsigned long Command_ID=0;
    	pthread_t threadsnum=0;
    	unsigned long Body_Length=0;
    	int cn= 0;
    	struct timespec recvtime;
	pthread_mutex_t recvlock = PTHREAD_MUTEX_INITIALIZER;
   	pthread_cond_t  recvcond = PTHREAD_COND_INITIALIZER;
	char *socketbuffer = NULL;
	int buffersize=0;
	RES_Q resqtmp;
    	struct tm *tp;
    	time_t timep;
    	time_t lastLoginTime;
    	time_t nowTime;
    	
    	unsigned long icount=0;
	int resqnum =0;
	int recvport =0, recvversion =0;
	unsigned long  Sequence_Num=1;
	unsigned long SUB_Sequence_Num=0;
	int return_id =0;
	char Msg_Content[500]="", query_string[1024],sqlbuf[1024];
	char Version;
	MYSQL fsql_res;
	
	int recvmode = 0;
	unsigned long Command_Status =0;
	char Msg_ID[30]="", Recv_Time[25] ="",Src_Term_ID[32] ="", Dest_Term_ID[32] ="", tmp[500]="";
	char ContentPackType[20]="",IsmpPackType[20]="",msgid[30]="";
	int Is_Report=0,Msg_Length=0;
	TLV Congestion_State = { 0x0428, 1, 35 };
	char RerverseId[30]="",ErrorCode[32]="";

	
    	threadsnum = pthread_self();
    	pthread_detach(threadsnum);
	mysql_thread_init();
	
    	recvport = argu[0]; // Port 
    	cn = argu[1]; // 连接号
    	recvversion = argu[2]; // Version
    	recvmode = argu[3]; // mode 
	INFO("[INFO-RECV:%u] RECV Thread START\n", threadsnum);

	if(!mysql_init(&fsql_res))
	{
		INFO("[-ERR:RECV:%u] init error \n", threadsnum);
		exit(EXIT_FAILURE);
    	}
    	if (!mysql_real_connect(&fsql_res, send_q_host, send_q_user, send_q_pass, send_q_db, send_q_port, NULL, 0) ) 
	{
        	INFO("[-ERR:RECV:%u] RECV MO sql_connect: %s\n", threadsnum, mysql_error(&fsql_res));
        	exit(EXIT_FAILURE);
    	}

	socketbuffer = malloc(socketbuffersize);
	if(socketbuffer == NULL)
	{
		INFO("[INFO-RECV:%u] malloc error .\n", threadsnum);
		socketbuffer = malloc(socketbuffersize);
	}
	buffersize = 0;
    	INFO("[INFO-RECV:%u]: Recv thread start!\n", threadsnum);
	while(Rsocket_fd <=0)
	{       
		if ( threads[cn][0] >0 )
		{
		    close(threads[cn][0]);
		    INFO("[INFO-RECV:%u]: sleep5 ready reconnect....\n", threadsnum);
		    sleep(5);		    
		}
		
		Rsocket_fd = ConnectISMG(CNGP_IP, recvport, LOGIN_NAME, LOGIN_PASSWORD, Sequence_Num, recvmode, recvversion, threadsnum);
		
		if(Rsocket_fd > 0)
		{
			time(&lastLoginTime); //保存上次登录的时间.
			
			pthread_mutex_lock(&cnum_lock[cn]);
			threads[cn][0] = Rsocket_fd;
			pthread_mutex_unlock(&cnum_lock[cn]);
			INFO("[INFO-RECV:%u] ConnectISMG socket :%d\n", threadsnum, Rsocket_fd);
		}
		else{
	  		recvtime.tv_sec  =  time(NULL)+5;
	                recvtime.tv_nsec  =  0;
			pthread_mutex_lock(&recvlock);
			ret  =  pthread_cond_timedwait(&recvcond,  &recvlock, (const  struct  timespec  *)&recvtime);
	 		if(ret !=0 )
			{
				INFO("[INFO-RECV:%u]:  Connect to ISMG Error , wait 5 Sec  .\n", threadsnum);
				pthread_mutex_unlock(&recvlock);
			}
		}
	}
	
	icount=0;
    	while(1)
	{
		icount++;
		if( HaveDataTimeout(Rsocket_fd, timeoutu) > 0 )
		{
			//INFO("[INFO-RECV:%u] ----RecvHaveData--- \n",threadsnum);
			// have data to receive
		    	if( buffersize < socketbuffersize)
		    	{
				recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
				if(recv_len < 0)  // <= 0)
				{
					int fdtmp=0;
					// reconnect
					INFO("[INFO-RECV:%u] recv thread cannot receive data from socket, reconnect. recv_len:%d\n",threadsnum, recv_len);
					INFO("[INFO-RECV:%u] Reset login status lock\n", threadsnum);
					
					fdtmp =threads[cn][0];
					pthread_mutex_lock(&cnum_lock[cn]);
					threads[cn][1] = 0; // login status 
					threads[cn][2] = 0; // need submit resp
					threads[cn][0] = 0; // socket
					pthread_mutex_unlock(&cnum_lock[cn]);
					INFO("[INFO-RECV:%u] Reset login status unlock\n", threadsnum);
					CNGP_Exit(Rsocket_fd, SUB_Sequence_Num);
					close(Rsocket_fd);
					close(datatmp);
					sleep(5);
					
					INFO("[INFO-RECV:%u] Buffersize == %d\n", threadsnum, buffersize);
					if( 0 == buffersize )
					{
						INFO("[INFO-RECV:%u] Buffersize ==0 , reconnect\n", threadsnum);
						Sequence_Num =1;
						Rsocket_fd = ConnectISMG(CNGP_IP, recvport, LOGIN_NAME, LOGIN_PASSWORD, Sequence_Num, recvmode, recvversion, threadsnum);
						if(Rsocket_fd > 0)
						{
							time(&lastLoginTime);
							pthread_mutex_lock(&cnum_lock[cn]);
							threads[cn][0] = Rsocket_fd;
							pthread_mutex_unlock(&cnum_lock[cn]);
						}
						//模拟 response
						INFO("[INFO-RECV:%u] simulate resp lock\n", threadsnum );
						pthread_mutex_lock(&sendq_lock[cn]);
						pthread_mutex_lock(&resq_lock[cn]);
						n = simulateres(SENDQ[cn], RESQ[cn]);
						pthread_mutex_unlock(&resq_lock[cn]);
						pthread_mutex_unlock(&sendq_lock[cn]);
						INFO("[INFO-RECV:%u] simulate resp unlock simulate:%d\n", threadsnum ,n);
						//修改等待resp数量
						INFO("[INFO-RECV:%u] change need sub resp num lock\n", threadsnum);
						pthread_mutex_lock(&cnum_lock[cn]);
						threads[cn][2] -= n;
						pthread_mutex_unlock(&cnum_lock[cn]);
						INFO("[INFO-RECV:%u] change need sub resp num lock\n", threadsnum);
					}
				}
				else
				{
					if( 0== recv_len )
					{
					    if( icount%7==0 )
					        INFO("[INFO-RECV:%u] RECV DATA %d\n", threadsnum, recv_len);
					}else
					{
					     INFO("[INFO-RECV:%u] RECV DATA %d\n", threadsnum, recv_len);						
					}    
					    
					buffersize += recv_len;
				}
		    	}
		}

		resqnum = checkresqnum(RESQ[cn]);
		while( (buffersize >= LEN_CNGP_HEADER) && (resqnum >0))
		{
			n = CNGP_HEADER_Recv_mem(socketbuffer, &Body_Length, &Command_ID, &Command_Status, &SUB_Sequence_Num);
			buffersize = buffersize - LEN_CNGP_HEADER;
			INFO("[INFO-RECV:%u] buffsersize:%d Command_ID:%08x Body_Length:%d SUB_Sequence_Num:%u\n", threadsnum, buffersize, Command_ID, Body_Length, SUB_Sequence_Num);
			memmove(socketbuffer , socketbuffer+LEN_CNGP_HEADER, buffersize);
			if( (buffersize < Body_Length) && (threads[cn][1] ==2) )
			{
				recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
				if(recv_len> 0){
					buffersize += recv_len; 
				}
			}

			if(Command_ID == ID_CNGP_ACTIVE_TEST_RESP)
			{
				// ACTIVE_TEST_RESP 
				INFO("[INFO-RECV:%u] Receive Active_Test_Resp .\n", threadsnum);
			}
			else if(Command_ID == ID_CNGP_ACTIVE_TEST)
			{
				// ACTIVE_TEST
				INFO("[INFO-RECV:%u] Receive Active_Test .\n", threadsnum);
				CNGP_Active_TestRes(Rsocket_fd, SUB_Sequence_Num);
				INFO("[INFO-RECV:%u] Send Active_Test_Resp.\n", threadsnum);
			}
			else if( Command_ID == ID_CNGP_LOGIN_RESP)
			{
				CNGP_Receive_LoginRes_mem(socketbuffer, Body_Length, &Version);
				buffersize = buffersize - Body_Length;
				memmove(socketbuffer, socketbuffer+Body_Length, buffersize);
				if( 0 == Command_Status)
				{
					pthread_mutex_lock(&cnum_lock[cn]);
					threads[cn][1] = 2; // login status
					threads[cn][2] = 0; // need submit resp
					pthread_mutex_unlock(&cnum_lock[cn]);
					Rsocket_fd = threads[cn][0];
					INFO("[INFO-RECV:%u] connect resp Recv OK,chang login status .\n", threadsnum );
				}
				else
				{	
					INFO("[INFO-RECV:%u] connect resp Recv ,Status :%d login err .lock\n", threadsnum, Command_Status);
					
					if( threads[cn][0]>0)
					     close(threads[cn][0]);
					close(Rsocket_fd);
					buffersize=0;
																				
					pthread_mutex_lock(&cnum_lock[cn]);
					threads[cn][0] = 0; // socket
					threads[cn][1] = 0; // not login 
					threads[cn][2] = 0; // reset need submit resp 
					pthread_mutex_unlock(&cnum_lock[cn]);
					INFO("[INFO-RECV:%u] connect resp Recv ,login err .  unlock\n", threadsnum );
				        sleep(5);
				}
			}
			else if(Command_ID == ID_CNGP_DELIVER)
			{
				memset (Msg_ID, 0, 10);
				memset (Recv_Time, 0, 20);
				memset (Src_Term_ID, 0, 21);
				memset (Dest_Term_ID, 0, 21);
				memset (Msg_Content, 0, sizeof(Msg_Content) );
				memset (tmp, 0, 500);
				memset (ContentPackType,0,16);
				memset (IsmpPackType,0,16);
				
				CNGP_Receive_Deliver_mem( socketbuffer, Body_Length, Msg_ID, &Is_Report, Recv_Time, Src_Term_ID, Dest_Term_ID, &Msg_Length, Msg_Content);
				
				//buffersize = buffersize - Body_Length;
												
				if( buffersize < Body_Length)
				{
					int ki=0;
					for(ki=0;ki<5;ki++)
					{
					     recv_len = recv(Rsocket_fd, socketbuffer+buffersize, socketbuffersize-buffersize-1, 0);
				             if( recv_len >0 )
				                ki=99;
				             else
				                usleep(200*1000);				                				                
				        }
				        
					if( recv_len > 0)
					{
						buffersize += recv_len;
						CNGP_Receive_Deliver_mem( socketbuffer, Body_Length, Msg_ID, &Is_Report, Recv_Time, Src_Term_ID, Dest_Term_ID, &Msg_Length, Msg_Content);				
					}
					else{
					        #if 1
					        INFO("[INFO-RECV:%u] Deliver error packet.\n",threadsnum);
						//INFO("[INFO-RECV:%u] error packet content:%s\n",threadsnum,socketbuffer);
						memset( socketbuffer,0,socketbuffersize );
						buffersize=0;
						INFO("[INFO-RECV:%u] not handle error packet.\n",threadsnum);
						
						INFO("[INFO-RECV:%u] simulate resp lock\n", threadsnum );
						pthread_mutex_lock(&sendq_lock[cn]);
						pthread_mutex_lock(&resq_lock[cn]);
						n = simulateres(SENDQ[cn], RESQ[cn]);
						pthread_mutex_unlock(&resq_lock[cn]);
						pthread_mutex_unlock(&sendq_lock[cn]);
						INFO("[INFO-RECV:%u] simulate resp u

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -