⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 double_queue.c

📁 linux下一种双缓冲队列机制的实现源代码(C)。
💻 C
📖 第 1 页 / 共 2 页
字号:
    int headerresult=0;
   //for socket
    struct sockaddr_in servaddr;
    int addrlen;
    char mesg;
    int port;
    int clientsock;  
    int ret=0;
    int sock_buf_size=SOCKBUFFSIZE;
	char *from=NULL;
	char *to=NULL;
	struct RQueue_Node * process_node = (struct RQueue_Node *)malloc(sizeof(struct RQueue_Node)) ;
	if ( process_node == NULL)
	{
		printf("%d daemonPor start erro when malloc\n", num) ;
		exit(0) ;
	}

	#ifdef _DEBUG
    printf("%d daemonPor start\n", num) ;
	#endif
	while(1)
	{
		if (pthread_mutex_lock(&process_RQueue->QueueMutex)!=0)  //lock QueueMutex
		{
			printf("there must be some erro in daemonPro\n");
			return  ;
		}

		if ( process_RQueue->Count==0 ) ///the process_queue's food is all eaten up
		{
		    if ( process_RQueue->swapSignal==1 )  ///wake up the SwapThread to get food
			{
                #ifdef _DEBUG
				printf("=============================wake up the swap thread\n") ;
				#endif
				pthread_cond_signal(&process_RQueue->SwapCond) ;
				process_RQueue->swapSignal = 0 ;
                                process_RQueue->havedone=1;
			}

			if ( process_RQueue->Count==0 )
			{ 
				#ifdef _DEBUG		
				//printf("==============================daemon thread %d wait\n", num) ;
				#endif
				if ( pthread_cond_wait( &process_RQueue->QueueCond, &process_RQueue->QueueMutex)!=0 )
				{
					printf("In daemonPro: condition wait error\n");
					//continue;
				}
				#ifdef _DEBUG				
	            //printf("==============================daemon thread %d wake up\n", num) ;
				#endif
			}    
			pthread_mutex_unlock(&process_RQueue->QueueMutex) ;   //unlock QueueMutex
			
		}
		else  ///normally, process the mail filter
		{		     
			///start to do it's job
			//free(process_RQueue->Porigin->data) ;
			//process_RQueue->Porigin->data = NULL ;
			printf("get the node\n") ;
			//printf("1\n") ;
			process_node->data = process_RQueue->Porigin->data ;
			//printf("2\n") ;
			process_RQueue->Porigin->data = NULL ;
			//printf("3\n") ;
			process_RQueue->Count-- ;
			//printf("4\n") ;
			if (process_RQueue->Count != 0)
			{
				process_RQueue->Porigin = process_RQueue->Porigin->next ;
			}
			else
			{
				process_RQueue->Porigin = process_RQueue->Pcursor ;
			}
			
			//printf("5\n") ;

        ///finish job
#ifdef _DEBUG
			process_num[num]++ ;
#endif
		///release the Mutex
			pthread_mutex_unlock(&process_RQueue->QueueMutex) ;   //unlock QueueMutex

            printf("go to process\n") ;
///////////////////////
            if ( process_node->data == NULL)
               continue ;
///deal the mail
                   //  printf("the mail is \n%s\n",process_node->data);
					//   printf("the mail length is \n%d\n",strlen(process_node->data));
                  if ( (headerresult=headercheck(process_node->data,&from,&to))==1)
                     {
                       printf("the mail is spam,decided by mail-header!============================\n");
					   goto RULE;
                      }
                   else
                      {
                      printf("the mail is ham,decided by mail-header!=============================\n");
					  // printf("out from: %s\n",from);
                      // printf("out to: %s\n",to);
                   
                      pthread_mutex_lock(&logmutex);  
					  logfile= log_242(process_node->data,1,from,to);
					   pthread_mutex_unlock(&logmutex);
					   printf("the logfile is \n%s\n",logfile);
					 //  sendlog(logfile);
                                          if (logfile!=NULL)
					   {
                                           free(logfile);
					   logfile=NULL;
                                           }  
                      // free(process_node->data);
                     //  process_node->data=NULL; 
                      // continue;  
					 // goto RULE;//ues for send
                      }
                  //printf("rulebased is %s\n",project_file->rulebased) ;
               //   printf("111the length is %d\n",strlen(process_node->data));
              //    printf("out from: %s\n",from);
               //   printf("out to: %s\n",to);
                   
				
                 if (*project_file->bayes=='1')           
                    {  
                      if ((headerresult=classify(process_node->data))==1)       
                        {
                         printf("the mail is spam,decided by bayes!================================\n");
          
                        }
                        else 
                        {
							if ((headerresult==0)||(headerresult==2))
							{
                         printf("the mail is ham,decided by bayes!=================================\n"); 
						 pthread_mutex_lock(&logmutex); 
						logfile=log_242(process_node->data,2,from,to);
						 pthread_mutex_unlock(&logmutex);
			             printf("the logfile is \n%s\n",logfile);
					   //  sendlog(logfile);
                                          if (logfile!=NULL)
					   {
					     free(logfile);
					     logfile=NULL;
                                           }
		if (process_node->data!=NULL)
		{
		            free(process_node->data);
                    process_node->data=NULL; 
		}
        if (from!=NULL)
        {
					free(from);
                    from=NULL;   
        }
        if (to!=NULL)
        {
					free(to);
                    to=NULL;   
        }
                         continue; 
						     }
							 else 
                                printf("the mail is spam(virus),decided by bayes!======================\n");
                      
                        }
                    }
                  
               //      printf("the mail is \n%s\n",process_node->data);
		//		     printf("the mail length is %d\n",strlen(process_node->data));
					
RULE: 
		          if (*project_file->rulebased=='1')
                    {  
	                 if ((data2=strstr(process_node->data,"Date:"))==NULL)
	                 {
						 goto theend;
	                 }
					  // printf("the mail in rule is \n%d\n",strlen(data2));
                    // if ((headerresult=rule_filter(data2,strlen(data2)))==1)
                       if (1==1)
                       {
                        printf("the mail is spam,decided by rulefilter!==============================\n");
						 pthread_mutex_lock(&logmutex); 
						 logfile=log_242(process_node->data,3,from,to);
						  pthread_mutex_unlock(&logmutex);
						 // printf("the logfile is \n%s\n",logfile);
					     // sendlog(logfile);
					      free(logfile);
					      logfile=NULL;
                      // free(process_node->data);
                        // process_node->data=NULL; 
                        // continue;   
                       }
                       else 
                      {
                         printf("the mail is ham,decided by rulefilter!==============================\n"); 
						  pthread_mutex_lock(&logmutex); 
						logfile= log_242(process_node->data,4,from,to);
						 pthread_mutex_unlock(&logmutex);
						// printf("the logfile is \n%s\n",logfile);
					   //  sendlog(logfile);
					    free(logfile);
					     logfile=NULL;
						//////// exit(0);
						 ////////send to middleware
//if (headerresult!=1)
//{
	//  printf("the mail is \n%s\n",process_node->data);
 ////                   clientsock=socket(AF_INET,SOCK_STREAM,0);
////				 ret = setsockopt( clientsock, SOL_SOCKET, SO_SNDBUF,(char *)&sock_buf_size, sizeof(sock_buf_size) );
 ////                        ret = setsockopt( clientsock, SOL_SOCKET, SO_RCVBUF,(char *)&sock_buf_size, sizeof(sock_buf_size) );


						 memset( &servaddr, 0, sizeof(servaddr) );
	                     servaddr.sin_family = AF_INET;
						 port=2500;
	                     servaddr.sin_port = htons( port );
                         if (inet_pton( AF_INET, "127.0.0.1", &servaddr.sin_addr )<=0)
	                         printf("inet_pton error\n");
	                     addrlen = sizeof( servaddr ); 

                      ///printf("now connect!\n");
    ////   					 if (connect(clientsock,(struct sockaddr *)&servaddr,sizeof(servaddr))==0)
      					 {
							 printf("connect success!\n");
       					 }
						 ///printf("connect over!\n");
						/// printf("the mail is \n%s\n",data1);
						///sleep(1);

        sendagain:
			  /// if ( 0 < sendto( clientsock, data1, strlen(data1), 0, (struct sockaddr*)&servaddr, addrlen ) )
  ////                        write(clientsock,process_node->data,strlen(process_node->data));
						  printf("connect over!\n");
							 {
			printf("the length is %d\n",strlen(process_node->data));
	                          //if(0>= recvfrom(clientsock, &mesg, 1, 0, (struct sockaddr*)&servaddr, &addrlen))
	                           //{	 
		                       // printf("recv error\n");
	                          // }
							  printf("sendto ok!\n");
                             }
							//else
	                        // {
	                        // 	printf("send mail error\n");
							//	goto sendagain;
	                       //  }
////	                        close(clientsock);
	                    //end send to middleware
                         //store the mail to the database!
                   }
//}
                    }

       		  //  i = process_smtp_client(smtp, ts->server.data+body,ts->server.count-body);
    theend:        
		if (process_node->data!=NULL)
		{
		            free(process_node->data);
                    process_node->data=NULL; 
		}
        if (from!=NULL)
        {
					free(from);
                    from=NULL;   
        }
        if (to!=NULL)
        {
					free(to);
                    to=NULL;   
        }

	}
}

}




int init_mailqueue()
{
	struct RQueue_Node* iniPoolNode ;
	struct RQueue_Node* iniProcessNode ;
	pthread_t   tid_daemon ;
	pthread_t   tid_daemon1[DAEMON_THREAD_NUM] ;
	int i ;

    pool_RQueue    = (struct Queue_Head *)my_malloc(sizeof(struct Queue_Head)) ; 
	process_RQueue = (struct Queue_Head *)my_malloc(sizeof(struct Queue_Head)) ; 
	swapQueue      = (struct Queue_Head *)my_malloc(sizeof(struct Queue_Head)) ;  

	iniPoolNode    = my_malloc(MAX_QUEUE_NUMBER * sizeof(struct RQueue_Node)) ;
	iniProcessNode = my_malloc(MAX_QUEUE_NUMBER * sizeof(struct RQueue_Node)) ;

    //串联各个节点
	for ( i=0; i<MAX_QUEUE_NUMBER-1; i++ )
	{
		iniPoolNode[i].next    = &(iniPoolNode[i+1]) ;
		iniPoolNode[i].data    = NULL ;
		iniProcessNode[i].next = &(iniProcessNode[i+1]) ;
		iniProcessNode[i].data = NULL ;
	}
	//环接
	iniPoolNode[ MAX_QUEUE_NUMBER-1 ].next    = &(iniPoolNode[0]) ;
	iniProcessNode[ MAX_QUEUE_NUMBER-1 ].next = &(iniProcessNode[0]) ;
    
	pool_RQueue->Porigin     = &iniPoolNode[0] ;  
	pool_RQueue->Pcursor     = &iniPoolNode[0]  ;  
	pool_RQueue->Count    = 0 ;
        pool_RQueue->havedone    = 0 ;
	pool_RQueue->swapSignal = 0 ;
	process_RQueue->Porigin     = &iniProcessNode[0] ;  
	process_RQueue->Pcursor     = &iniProcessNode[0]  ;  
	process_RQueue->Count    = 0 ; 
        process_RQueue->havedone    = 0 ; 
	process_RQueue->swapSignal = 0 ;
	memset(swapQueue, 0, sizeof(struct Queue_Head)) ;

	///pool queue
	if(pthread_mutex_init(&(pool_RQueue->QueueMutex),NULL)!=0) //init 锁
	{
	    printf("pthread_mutex_init: pool_RQueue->QueueMutex error\n");
	    return 0;
	}
	if(pthread_cond_init(&(pool_RQueue->QueueCond),NULL)!=0) //init 条件变量
	{
		printf("pthread_cond_init: pool_RQueue->QueueCond error\n");
		return 0;
	}
    
	///process queue
	if(pthread_mutex_init(&(process_RQueue->QueueMutex),NULL)!=0) //init 锁
	{
	    printf("pthread_mutex_init: process_RQueue->QueueMutex error\n");
	    return 0;
	}
	if(pthread_cond_init(&(process_RQueue->QueueCond),NULL)!=0) //init 条件变量
	{
		printf("pthread_cond_init: process_RQueue->QueueCond init error\n");
		return 0;
	}
	if(pthread_cond_init(&(process_RQueue->SwapCond),NULL)!=0) //init 条件变量
	{
		printf("pthread_cond_init: process_RQueue->SwapCond init error\n");
		return 0;
	}
	///create the daemon thread     ,and the Swap thread
	pthread_create(&tid_daemon, NULL, SwapQueue, NULL) ;  ///create the swap thread
   //&tid_daemon
    sleep(1) ;
    for ( i=0; i<DAEMON_THREAD_NUM; i++)
	{
		pthread_create(&tid_daemon1[i], NULL, daemonPro, &i) ;
#ifdef _DEBUG
		process_num[i] = 0 ;
#endif
	}
	
	return 1;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -