⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 double_queue.c

📁 linux下一种双缓冲队列机制的实现源代码(C)。
💻 C
📖 第 1 页 / 共 2 页
字号:
#include <stdio.h>
#include <string.h>
#include <sys/types.h>

#include <ctype.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>

#include "buf.h"
#include "double_queue.h"
#include "pro_packet.h"
//#include "base64.h"
#include "main.h"
#include "../include/cclient_all.h"
#include "../include/junk_main.h"
#include "../include/headercheck.h"


#define FILE_MODE (S_IRUSR|S_IRGRP|S_IROTH|S_IWUSR)
#define SOCKBUFFSIZE 250000
#define BUFSIZE 65535
#define MAX_QUEUE_NUMBER    1040///number of the node of the mailqueue
#define DAEMON_THREAD_NUM   30       ///number of the thread
#define DO_FILTER 1

#define _DEBUG

#ifdef _DEBUG
       #include <arpa/inet.h>
static int process_num[DAEMON_THREAD_NUM] ;
#endif


////filter config





pthread_mutex_t logmutex;
int fdlog;



/////the function start here`

void
Add_Mailqueue(char *data)
//Add_Mailqueue(char *data, const struct tuple4 *addr)
{
	//struct MailQueue_Node *a_Mail_Node = (struct MailQueue_Node*)my_malloc(sizeof(struct MailQueue_Node)); //malloc for new node
    pthread_mutex_lock(&pool_RQueue->QueueMutex) ;

	if ( pool_RQueue->Count >= (MAX_QUEUE_NUMBER*0.9) )  /*the amount of the queue get it maxmun*/
	{
		/*处理不能缓存的队列*/
		//drop_num++ ;
		free(pool_RQueue->Porigin->data) ;
		pool_RQueue->Porigin->data = NULL ;
		pool_RQueue->Porigin = pool_RQueue->Porigin->next ;
		pool_RQueue->Count-- ;
	}
	/*给mailnode赋值*/
	//memcpy(&(a_Mail_Node->addr), addr, sizeof(struct tuple4)) ; //copy the tuple4
	//printf("++++++++++++++++++%d,%d,%d,%d\n",a_Mail_Node->addr.source,a_Mail_Node->addr.dest,a_Mail_Node->addr.saddr,a_Mail_Node->addr.daddr) ;
	   
	/*将该node加入pool_MailQueue*/	
	pool_RQueue->Pcursor->data = data ;
	pool_RQueue->Pcursor = pool_RQueue->Pcursor->next ;
					
    pool_RQueue->Count++ ; 


	if (pool_RQueue->Count == 1)  ///when the queue is empty  ,before ++, it is 0 
	{
		#ifdef _DEBUG
		printf("now the queue is empty\n") ;
		#endif
		pthread_cond_signal(&pool_RQueue->QueueCond) ; /*wake up the comsumer*/
	}


#ifdef _DEBUG
	//printf("producer has put the mail\n") ;
#endif

    pthread_mutex_unlock(&pool_RQueue->QueueMutex) ;

}

struct Queue_Head *
Get_Rqueue()
{
	
	printf("I come to get the food\n") ;
	if (pthread_mutex_lock(&pool_RQueue->QueueMutex)!=0)
	{
		printf("there must be some erro in main\n");
		return NULL ;
	}

	if (pool_RQueue->Count== 0)
	{
		if ( pthread_cond_wait( &pool_RQueue->QueueCond, &pool_RQueue->QueueMutex)!=0 )
		{
			printf("In	GlobalThread: condition wait error\n");
			//continue;
		}
	}
    /*交换两个队列*/
	#ifdef _DEBUG
		   printf("exchange the queue!\n") ;
	#endif
	///pool_RQueue swap process_RQueue by swapQueue
	process_RQueue->Count = 0 ;
	memcpy(swapQueue, pool_RQueue, 16) ; 
	memcpy(pool_RQueue, process_RQueue, 16) ; 
	memcpy(process_RQueue, swapQueue, 16) ;	

#ifdef _DEBUG
	printf("process_RQueue->Count: %d\n", process_RQueue->Count) ;
#endif

	pthread_mutex_unlock(&pool_RQueue->QueueMutex) ;   ///unlock the pool queue

	///wake up the daemon thread
    pthread_mutex_lock(&process_RQueue->QueueMutex) ;   ///unlock the process queue
	if (process_RQueue->Count > 0)         ///when the queue is empty
	{
		printf("================================broadcast the daemon thread\n") ;
		pthread_cond_broadcast(&process_RQueue->QueueCond) ; ///use broadcast to wake up
	}
	process_RQueue->swapSignal = 1 ;       ///allowed daemonpro to wake up
	pthread_mutex_unlock(&process_RQueue->QueueMutex) ;   ///unlock the process queue

	///end wake up the daemon thread
															
	return process_RQueue ;
}


int SwapQueue()
{
	printf("SwapQueue Create\n") ;

	process_RQueue->swapSignal = 1 ;
	while(1)
	{
		//process_num_swap 
		process_RQueue = Get_Rqueue() ;
		printf("Get the queue\n") ;
#ifdef _DEBUG
		//queue_got_food += process_RQueue->Count ;
#endif
        pthread_mutex_lock(&process_RQueue->QueueMutex) ;   ///unlock the process queue
            if (process_RQueue->havedone==1) 
              {
               process_RQueue->havedone=0;
              goto WAKE;
              }
	    if ( pthread_cond_wait(&process_RQueue->SwapCond, &process_RQueue->QueueMutex)!=0 )
		{
				printf("In	daemonPro: condition wait error\n");
				exit(0) ;
		}
		//process_RQueue->swapSignal = 1 ;
  WAKE:
		pthread_mutex_unlock(&process_RQueue->QueueMutex) ;   ///unlock the process queue
  
		printf("wake up\n") ;
		//swap_time++ ;

	}
	
	return -1;	
}







char* log_242(char *data1,int biaozhi,char *from,char *to)
{
   char *logbuf=NULL;
   int logcount=0;
   char tmp[15]={"./242.log"};
   char *p=NULL;
   char *q=NULL;
   int count=0;
   char time3[100];
   char size[20];
   time_t time1;
   struct tm *time2=NULL;//(struct tm *)malloc(sizeof(struct tm ));;
   logbuf=(char *)malloc(2000);
   time(&time1);
   time2=localtime(&time1);
   strftime(time3,100,"%Y-%m-%d %H:%M:%S",time2);
   // printf("the time is :\n%s\n",time3);
   fdlog=open(tmp, O_RDWR | O_CREAT | O_APPEND,FILE_MODE);
   write(fdlog,"time:",5);
   memcpy(logbuf ,"time:",5);
   logcount+=5;
   write(fdlog,time3,19);
   memcpy(logbuf+logcount ,time3,19);
   logcount+=19;
   //from
   write(fdlog,"\nmailfrom:",10);
   memcpy(logbuf+logcount ,"\nmailfrom:",10);
   logcount+=10;
  if (from!=NULL)
  {
   write(fdlog,from,strlen(from));
   memcpy(logbuf+logcount ,from,strlen(from));
   logcount+=strlen(from);
  }
  else
    { write(fdlog,"null",4);
      memcpy(logbuf+logcount ,"null",4);
     logcount+=4;
	}

   //to
   to:
   write(fdlog,"\nmailto:",8);
   memcpy(logbuf+logcount ,"\nmailto:",8);
   logcount+=8;
   if (to!=NULL)
   {
   write(fdlog,to,strlen(to));
   memcpy(logbuf+logcount ,to,strlen(to));
   logcount+=strlen(to);
   }
   else
    { write(fdlog,"null",4);
      memcpy(logbuf+logcount ,"null",4);
     logcount+=4;
	}

   //subject
   subject:
   write(fdlog,"\nsubject:",9);
   memcpy(logbuf+logcount ,"\nsubject:",9);
   logcount+=9;
   //  printf("get 1111111111111111111111111111111\n");
  if ((p=strstr(data1,"\nSubject: "))==NULL)
  {
	  	   printf("no subject\n");
	    write(fdlog,"null",4);
	    memcpy(logbuf+logcount ,"null",4);
        logcount+=4;
		goto others;
  } 
    //  printf("get 2222222222222222222222222222\n");
   p+=10;
   q=p;
   while (*p!='\r')
   {
	   count++;
	   p++;
	  
   }  
   //printf("get 4444444444444444444444444444444\n");
   write(fdlog,q,count);
   memcpy(logbuf+logcount ,q,count);
   logcount+=count;
   //
   others:
   write(fdlog,"\ntype:null\nlanguage:null\nfrom_ip:null\nip_address:null\nmethod:prob\nsize:",71);
   memcpy(logbuf+logcount ,"\ntype:null\nlanguage:null\nfrom_ip:null\nip_address:null\nmethod:prob\nsize:",71);
   logcount+=71;
   count=strlen(data1);
   sprintf(size, "%d\0", count); 
   write(fdlog,size,strlen(size));
   memcpy(logbuf+logcount ,size,strlen(size));
   logcount+=strlen(size);
   switch (biaozhi)
   {
   case 1: write(fdlog,"\niftransfer:N\nreason:header-ham\n*****\n",38);   
           memcpy(logbuf+logcount ,"\niftransfer:N\nreason:header-ham\0",32);
           logcount+=32;
		   break;
   case 2: write(fdlog,"\niftransfer:N\nreason:bayes-ham\n*****\n",37);
           memcpy(logbuf+logcount ,"\niftransfer:N\nreason:bayes-ham\0",31);
           logcount+=31;
		   break;
   case 3: write(fdlog,"\niftransfer:N\nreason:rule-spam\n*****\n",37);
           memcpy(logbuf+logcount ,"\niftransfer:N\nreason:rule-spam\0",31);
           logcount+=31;
		   break;
   case 4: write(fdlog,"\niftransfer:Y\nreason:rule-ham\n*****\n",36);
           memcpy(logbuf+logcount ,"\niftransfer:Y\nreason:rule-ham\0",30);
           logcount+=30;
		   break;
   default: NULL;
   
   }
   //
   close(fdlog);
   free(time2);
   time2=NULL;
   return logbuf;
}

void sendlog(logfile)
{
    struct sockaddr_in servaddr;
    int addrlen;
    char mesg;
    int port;
    int clientsock;  
    int ret=0;
    int sock_buf_size=SOCKBUFFSIZE;
                         clientsock=socket(AF_INET,SOCK_STREAM,0);
						 ret = setsockopt( clientsock, SOL_SOCKET, SO_SNDBUF,(char *)&sock_buf_size, sizeof(sock_buf_size) );
                         ret = setsockopt( clientsock, SOL_SOCKET, SO_RCVBUF,(char *)&sock_buf_size, sizeof(sock_buf_size) );


						 memset( &servaddr, 0, sizeof(servaddr) );
	                     servaddr.sin_family = AF_INET;
						 port=2501;
	                     servaddr.sin_port = htons( port );
                         if (inet_pton( AF_INET, "127.0.0.1", &servaddr.sin_addr )<=0)
	                         printf("inet_pton error\n");
	                     addrlen = sizeof( servaddr ); 

                      ///printf("now connect!\n");
       					 if (connect(clientsock,(struct sockaddr *)&servaddr,sizeof(servaddr))==0)
      					 {
							 printf("connect success!\n");
       					 }
						 ///printf("connect over!\n");
						/// printf("the mail is \n%s\n",data1);
						///sleep(1);

        sendagain:
						  /// if ( 0 < sendto( clientsock, data1, strlen(data1), 0, (struct sockaddr*)&servaddr, addrlen ) )
                          write(clientsock,logfile,strlen(logfile));
						  printf("connect over!\n");
							 {
			
	                          //if(0>= recvfrom(clientsock, &mesg, 1, 0, (struct sockaddr*)&servaddr, &addrlen))
	                           //{	 
		                       // printf("recv error\n");
	                          // }
							  printf("sendto ok!\n");
                             }
							//else
	                        // {
	                        // 	printf("send mail error\n");
							//	goto sendagain;
	                       //  }
	                        close(clientsock);

}


void daemonPro(int *daemon_num) 
{
	char * data2=NULL;
	char *logfile=NULL;
	int num = *daemon_num;
	int result = 0 ;
	unsigned long packet_id = 0;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -