📄 double_queue.c
字号:
int headerresult=0;
//for socket
struct sockaddr_in servaddr;
int addrlen;
char mesg;
int port;
int clientsock;
int ret=0;
int sock_buf_size=SOCKBUFFSIZE;
char *from=NULL;
char *to=NULL;
struct RQueue_Node * process_node = (struct RQueue_Node *)malloc(sizeof(struct RQueue_Node)) ;
if ( process_node == NULL)
{
printf("%d daemonPor start erro when malloc\n", num) ;
exit(0) ;
}
#ifdef _DEBUG
printf("%d daemonPor start\n", num) ;
#endif
while(1)
{
if (pthread_mutex_lock(&process_RQueue->QueueMutex)!=0) //lock QueueMutex
{
printf("there must be some erro in daemonPro\n");
return ;
}
if ( process_RQueue->Count==0 ) ///the process_queue's food is all eaten up
{
if ( process_RQueue->swapSignal==1 ) ///wake up the SwapThread to get food
{
#ifdef _DEBUG
printf("=============================wake up the swap thread\n") ;
#endif
pthread_cond_signal(&process_RQueue->SwapCond) ;
process_RQueue->swapSignal = 0 ;
process_RQueue->havedone=1;
}
if ( process_RQueue->Count==0 )
{
#ifdef _DEBUG
//printf("==============================daemon thread %d wait\n", num) ;
#endif
if ( pthread_cond_wait( &process_RQueue->QueueCond, &process_RQueue->QueueMutex)!=0 )
{
printf("In daemonPro: condition wait error\n");
//continue;
}
#ifdef _DEBUG
//printf("==============================daemon thread %d wake up\n", num) ;
#endif
}
pthread_mutex_unlock(&process_RQueue->QueueMutex) ; //unlock QueueMutex
}
else ///normally, process the mail filter
{
///start to do it's job
//free(process_RQueue->Porigin->data) ;
//process_RQueue->Porigin->data = NULL ;
printf("get the node\n") ;
//printf("1\n") ;
process_node->data = process_RQueue->Porigin->data ;
//printf("2\n") ;
process_RQueue->Porigin->data = NULL ;
//printf("3\n") ;
process_RQueue->Count-- ;
//printf("4\n") ;
if (process_RQueue->Count != 0)
{
process_RQueue->Porigin = process_RQueue->Porigin->next ;
}
else
{
process_RQueue->Porigin = process_RQueue->Pcursor ;
}
//printf("5\n") ;
///finish job
#ifdef _DEBUG
process_num[num]++ ;
#endif
///release the Mutex
pthread_mutex_unlock(&process_RQueue->QueueMutex) ; //unlock QueueMutex
printf("go to process\n") ;
///////////////////////
if ( process_node->data == NULL)
continue ;
///deal the mail
// printf("the mail is \n%s\n",process_node->data);
// printf("the mail length is \n%d\n",strlen(process_node->data));
if ( (headerresult=headercheck(process_node->data,&from,&to))==1)
{
printf("the mail is spam,decided by mail-header!============================\n");
goto RULE;
}
else
{
printf("the mail is ham,decided by mail-header!=============================\n");
// printf("out from: %s\n",from);
// printf("out to: %s\n",to);
pthread_mutex_lock(&logmutex);
logfile= log_242(process_node->data,1,from,to);
pthread_mutex_unlock(&logmutex);
printf("the logfile is \n%s\n",logfile);
// sendlog(logfile);
if (logfile!=NULL)
{
free(logfile);
logfile=NULL;
}
// free(process_node->data);
// process_node->data=NULL;
// continue;
// goto RULE;//ues for send
}
//printf("rulebased is %s\n",project_file->rulebased) ;
// printf("111the length is %d\n",strlen(process_node->data));
// printf("out from: %s\n",from);
// printf("out to: %s\n",to);
if (*project_file->bayes=='1')
{
if ((headerresult=classify(process_node->data))==1)
{
printf("the mail is spam,decided by bayes!================================\n");
}
else
{
if ((headerresult==0)||(headerresult==2))
{
printf("the mail is ham,decided by bayes!=================================\n");
pthread_mutex_lock(&logmutex);
logfile=log_242(process_node->data,2,from,to);
pthread_mutex_unlock(&logmutex);
printf("the logfile is \n%s\n",logfile);
// sendlog(logfile);
if (logfile!=NULL)
{
free(logfile);
logfile=NULL;
}
if (process_node->data!=NULL)
{
free(process_node->data);
process_node->data=NULL;
}
if (from!=NULL)
{
free(from);
from=NULL;
}
if (to!=NULL)
{
free(to);
to=NULL;
}
continue;
}
else
printf("the mail is spam(virus),decided by bayes!======================\n");
}
}
// printf("the mail is \n%s\n",process_node->data);
// printf("the mail length is %d\n",strlen(process_node->data));
RULE:
if (*project_file->rulebased=='1')
{
if ((data2=strstr(process_node->data,"Date:"))==NULL)
{
goto theend;
}
// printf("the mail in rule is \n%d\n",strlen(data2));
// if ((headerresult=rule_filter(data2,strlen(data2)))==1)
if (1==1)
{
printf("the mail is spam,decided by rulefilter!==============================\n");
pthread_mutex_lock(&logmutex);
logfile=log_242(process_node->data,3,from,to);
pthread_mutex_unlock(&logmutex);
// printf("the logfile is \n%s\n",logfile);
// sendlog(logfile);
free(logfile);
logfile=NULL;
// free(process_node->data);
// process_node->data=NULL;
// continue;
}
else
{
printf("the mail is ham,decided by rulefilter!==============================\n");
pthread_mutex_lock(&logmutex);
logfile= log_242(process_node->data,4,from,to);
pthread_mutex_unlock(&logmutex);
// printf("the logfile is \n%s\n",logfile);
// sendlog(logfile);
free(logfile);
logfile=NULL;
//////// exit(0);
////////send to middleware
//if (headerresult!=1)
//{
// printf("the mail is \n%s\n",process_node->data);
//// clientsock=socket(AF_INET,SOCK_STREAM,0);
//// ret = setsockopt( clientsock, SOL_SOCKET, SO_SNDBUF,(char *)&sock_buf_size, sizeof(sock_buf_size) );
//// ret = setsockopt( clientsock, SOL_SOCKET, SO_RCVBUF,(char *)&sock_buf_size, sizeof(sock_buf_size) );
memset( &servaddr, 0, sizeof(servaddr) );
servaddr.sin_family = AF_INET;
port=2500;
servaddr.sin_port = htons( port );
if (inet_pton( AF_INET, "127.0.0.1", &servaddr.sin_addr )<=0)
printf("inet_pton error\n");
addrlen = sizeof( servaddr );
///printf("now connect!\n");
//// if (connect(clientsock,(struct sockaddr *)&servaddr,sizeof(servaddr))==0)
{
printf("connect success!\n");
}
///printf("connect over!\n");
/// printf("the mail is \n%s\n",data1);
///sleep(1);
sendagain:
/// if ( 0 < sendto( clientsock, data1, strlen(data1), 0, (struct sockaddr*)&servaddr, addrlen ) )
//// write(clientsock,process_node->data,strlen(process_node->data));
printf("connect over!\n");
{
printf("the length is %d\n",strlen(process_node->data));
//if(0>= recvfrom(clientsock, &mesg, 1, 0, (struct sockaddr*)&servaddr, &addrlen))
//{
// printf("recv error\n");
// }
printf("sendto ok!\n");
}
//else
// {
// printf("send mail error\n");
// goto sendagain;
// }
//// close(clientsock);
//end send to middleware
//store the mail to the database!
}
//}
}
// i = process_smtp_client(smtp, ts->server.data+body,ts->server.count-body);
theend:
if (process_node->data!=NULL)
{
free(process_node->data);
process_node->data=NULL;
}
if (from!=NULL)
{
free(from);
from=NULL;
}
if (to!=NULL)
{
free(to);
to=NULL;
}
}
}
}
int init_mailqueue()
{
struct RQueue_Node* iniPoolNode ;
struct RQueue_Node* iniProcessNode ;
pthread_t tid_daemon ;
pthread_t tid_daemon1[DAEMON_THREAD_NUM] ;
int i ;
pool_RQueue = (struct Queue_Head *)my_malloc(sizeof(struct Queue_Head)) ;
process_RQueue = (struct Queue_Head *)my_malloc(sizeof(struct Queue_Head)) ;
swapQueue = (struct Queue_Head *)my_malloc(sizeof(struct Queue_Head)) ;
iniPoolNode = my_malloc(MAX_QUEUE_NUMBER * sizeof(struct RQueue_Node)) ;
iniProcessNode = my_malloc(MAX_QUEUE_NUMBER * sizeof(struct RQueue_Node)) ;
//串联各个节点
for ( i=0; i<MAX_QUEUE_NUMBER-1; i++ )
{
iniPoolNode[i].next = &(iniPoolNode[i+1]) ;
iniPoolNode[i].data = NULL ;
iniProcessNode[i].next = &(iniProcessNode[i+1]) ;
iniProcessNode[i].data = NULL ;
}
//环接
iniPoolNode[ MAX_QUEUE_NUMBER-1 ].next = &(iniPoolNode[0]) ;
iniProcessNode[ MAX_QUEUE_NUMBER-1 ].next = &(iniProcessNode[0]) ;
pool_RQueue->Porigin = &iniPoolNode[0] ;
pool_RQueue->Pcursor = &iniPoolNode[0] ;
pool_RQueue->Count = 0 ;
pool_RQueue->havedone = 0 ;
pool_RQueue->swapSignal = 0 ;
process_RQueue->Porigin = &iniProcessNode[0] ;
process_RQueue->Pcursor = &iniProcessNode[0] ;
process_RQueue->Count = 0 ;
process_RQueue->havedone = 0 ;
process_RQueue->swapSignal = 0 ;
memset(swapQueue, 0, sizeof(struct Queue_Head)) ;
///pool queue
if(pthread_mutex_init(&(pool_RQueue->QueueMutex),NULL)!=0) //init 锁
{
printf("pthread_mutex_init: pool_RQueue->QueueMutex error\n");
return 0;
}
if(pthread_cond_init(&(pool_RQueue->QueueCond),NULL)!=0) //init 条件变量
{
printf("pthread_cond_init: pool_RQueue->QueueCond error\n");
return 0;
}
///process queue
if(pthread_mutex_init(&(process_RQueue->QueueMutex),NULL)!=0) //init 锁
{
printf("pthread_mutex_init: process_RQueue->QueueMutex error\n");
return 0;
}
if(pthread_cond_init(&(process_RQueue->QueueCond),NULL)!=0) //init 条件变量
{
printf("pthread_cond_init: process_RQueue->QueueCond init error\n");
return 0;
}
if(pthread_cond_init(&(process_RQueue->SwapCond),NULL)!=0) //init 条件变量
{
printf("pthread_cond_init: process_RQueue->SwapCond init error\n");
return 0;
}
///create the daemon thread ,and the Swap thread
pthread_create(&tid_daemon, NULL, SwapQueue, NULL) ; ///create the swap thread
//&tid_daemon
sleep(1) ;
for ( i=0; i<DAEMON_THREAD_NUM; i++)
{
pthread_create(&tid_daemon1[i], NULL, daemonPro, &i) ;
#ifdef _DEBUG
process_num[i] = 0 ;
#endif
}
return 1;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -