📄 double_queue.c
字号:
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <ctype.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <pthread.h>
#include "buf.h"
#include "double_queue.h"
#include "pro_packet.h"
//#include "base64.h"
#include "main.h"
#include "../include/cclient_all.h"
#include "../include/junk_main.h"
#include "../include/headercheck.h"
#define FILE_MODE (S_IRUSR|S_IRGRP|S_IROTH|S_IWUSR)
#define SOCKBUFFSIZE 250000
#define BUFSIZE 65535
#define MAX_QUEUE_NUMBER 1040///number of the node of the mailqueue
#define DAEMON_THREAD_NUM 30 ///number of the thread
#define DO_FILTER 1
#define _DEBUG
#ifdef _DEBUG
#include <arpa/inet.h>
static int process_num[DAEMON_THREAD_NUM] ;
#endif
////filter config
pthread_mutex_t logmutex;
int fdlog;
/////the function start here`
void
Add_Mailqueue(char *data)
//Add_Mailqueue(char *data, const struct tuple4 *addr)
{
//struct MailQueue_Node *a_Mail_Node = (struct MailQueue_Node*)my_malloc(sizeof(struct MailQueue_Node)); //malloc for new node
pthread_mutex_lock(&pool_RQueue->QueueMutex) ;
if ( pool_RQueue->Count >= (MAX_QUEUE_NUMBER*0.9) ) /*the amount of the queue get it maxmun*/
{
/*处理不能缓存的队列*/
//drop_num++ ;
free(pool_RQueue->Porigin->data) ;
pool_RQueue->Porigin->data = NULL ;
pool_RQueue->Porigin = pool_RQueue->Porigin->next ;
pool_RQueue->Count-- ;
}
/*给mailnode赋值*/
//memcpy(&(a_Mail_Node->addr), addr, sizeof(struct tuple4)) ; //copy the tuple4
//printf("++++++++++++++++++%d,%d,%d,%d\n",a_Mail_Node->addr.source,a_Mail_Node->addr.dest,a_Mail_Node->addr.saddr,a_Mail_Node->addr.daddr) ;
/*将该node加入pool_MailQueue*/
pool_RQueue->Pcursor->data = data ;
pool_RQueue->Pcursor = pool_RQueue->Pcursor->next ;
pool_RQueue->Count++ ;
if (pool_RQueue->Count == 1) ///when the queue is empty ,before ++, it is 0
{
#ifdef _DEBUG
printf("now the queue is empty\n") ;
#endif
pthread_cond_signal(&pool_RQueue->QueueCond) ; /*wake up the comsumer*/
}
#ifdef _DEBUG
//printf("producer has put the mail\n") ;
#endif
pthread_mutex_unlock(&pool_RQueue->QueueMutex) ;
}
struct Queue_Head *
Get_Rqueue()
{
printf("I come to get the food\n") ;
if (pthread_mutex_lock(&pool_RQueue->QueueMutex)!=0)
{
printf("there must be some erro in main\n");
return NULL ;
}
if (pool_RQueue->Count== 0)
{
if ( pthread_cond_wait( &pool_RQueue->QueueCond, &pool_RQueue->QueueMutex)!=0 )
{
printf("In GlobalThread: condition wait error\n");
//continue;
}
}
/*交换两个队列*/
#ifdef _DEBUG
printf("exchange the queue!\n") ;
#endif
///pool_RQueue swap process_RQueue by swapQueue
process_RQueue->Count = 0 ;
memcpy(swapQueue, pool_RQueue, 16) ;
memcpy(pool_RQueue, process_RQueue, 16) ;
memcpy(process_RQueue, swapQueue, 16) ;
#ifdef _DEBUG
printf("process_RQueue->Count: %d\n", process_RQueue->Count) ;
#endif
pthread_mutex_unlock(&pool_RQueue->QueueMutex) ; ///unlock the pool queue
///wake up the daemon thread
pthread_mutex_lock(&process_RQueue->QueueMutex) ; ///unlock the process queue
if (process_RQueue->Count > 0) ///when the queue is empty
{
printf("================================broadcast the daemon thread\n") ;
pthread_cond_broadcast(&process_RQueue->QueueCond) ; ///use broadcast to wake up
}
process_RQueue->swapSignal = 1 ; ///allowed daemonpro to wake up
pthread_mutex_unlock(&process_RQueue->QueueMutex) ; ///unlock the process queue
///end wake up the daemon thread
return process_RQueue ;
}
int SwapQueue()
{
printf("SwapQueue Create\n") ;
process_RQueue->swapSignal = 1 ;
while(1)
{
//process_num_swap
process_RQueue = Get_Rqueue() ;
printf("Get the queue\n") ;
#ifdef _DEBUG
//queue_got_food += process_RQueue->Count ;
#endif
pthread_mutex_lock(&process_RQueue->QueueMutex) ; ///unlock the process queue
if (process_RQueue->havedone==1)
{
process_RQueue->havedone=0;
goto WAKE;
}
if ( pthread_cond_wait(&process_RQueue->SwapCond, &process_RQueue->QueueMutex)!=0 )
{
printf("In daemonPro: condition wait error\n");
exit(0) ;
}
//process_RQueue->swapSignal = 1 ;
WAKE:
pthread_mutex_unlock(&process_RQueue->QueueMutex) ; ///unlock the process queue
printf("wake up\n") ;
//swap_time++ ;
}
return -1;
}
char* log_242(char *data1,int biaozhi,char *from,char *to)
{
char *logbuf=NULL;
int logcount=0;
char tmp[15]={"./242.log"};
char *p=NULL;
char *q=NULL;
int count=0;
char time3[100];
char size[20];
time_t time1;
struct tm *time2=NULL;//(struct tm *)malloc(sizeof(struct tm ));;
logbuf=(char *)malloc(2000);
time(&time1);
time2=localtime(&time1);
strftime(time3,100,"%Y-%m-%d %H:%M:%S",time2);
// printf("the time is :\n%s\n",time3);
fdlog=open(tmp, O_RDWR | O_CREAT | O_APPEND,FILE_MODE);
write(fdlog,"time:",5);
memcpy(logbuf ,"time:",5);
logcount+=5;
write(fdlog,time3,19);
memcpy(logbuf+logcount ,time3,19);
logcount+=19;
//from
write(fdlog,"\nmailfrom:",10);
memcpy(logbuf+logcount ,"\nmailfrom:",10);
logcount+=10;
if (from!=NULL)
{
write(fdlog,from,strlen(from));
memcpy(logbuf+logcount ,from,strlen(from));
logcount+=strlen(from);
}
else
{ write(fdlog,"null",4);
memcpy(logbuf+logcount ,"null",4);
logcount+=4;
}
//to
to:
write(fdlog,"\nmailto:",8);
memcpy(logbuf+logcount ,"\nmailto:",8);
logcount+=8;
if (to!=NULL)
{
write(fdlog,to,strlen(to));
memcpy(logbuf+logcount ,to,strlen(to));
logcount+=strlen(to);
}
else
{ write(fdlog,"null",4);
memcpy(logbuf+logcount ,"null",4);
logcount+=4;
}
//subject
subject:
write(fdlog,"\nsubject:",9);
memcpy(logbuf+logcount ,"\nsubject:",9);
logcount+=9;
// printf("get 1111111111111111111111111111111\n");
if ((p=strstr(data1,"\nSubject: "))==NULL)
{
printf("no subject\n");
write(fdlog,"null",4);
memcpy(logbuf+logcount ,"null",4);
logcount+=4;
goto others;
}
// printf("get 2222222222222222222222222222\n");
p+=10;
q=p;
while (*p!='\r')
{
count++;
p++;
}
//printf("get 4444444444444444444444444444444\n");
write(fdlog,q,count);
memcpy(logbuf+logcount ,q,count);
logcount+=count;
//
others:
write(fdlog,"\ntype:null\nlanguage:null\nfrom_ip:null\nip_address:null\nmethod:prob\nsize:",71);
memcpy(logbuf+logcount ,"\ntype:null\nlanguage:null\nfrom_ip:null\nip_address:null\nmethod:prob\nsize:",71);
logcount+=71;
count=strlen(data1);
sprintf(size, "%d\0", count);
write(fdlog,size,strlen(size));
memcpy(logbuf+logcount ,size,strlen(size));
logcount+=strlen(size);
switch (biaozhi)
{
case 1: write(fdlog,"\niftransfer:N\nreason:header-ham\n*****\n",38);
memcpy(logbuf+logcount ,"\niftransfer:N\nreason:header-ham\0",32);
logcount+=32;
break;
case 2: write(fdlog,"\niftransfer:N\nreason:bayes-ham\n*****\n",37);
memcpy(logbuf+logcount ,"\niftransfer:N\nreason:bayes-ham\0",31);
logcount+=31;
break;
case 3: write(fdlog,"\niftransfer:N\nreason:rule-spam\n*****\n",37);
memcpy(logbuf+logcount ,"\niftransfer:N\nreason:rule-spam\0",31);
logcount+=31;
break;
case 4: write(fdlog,"\niftransfer:Y\nreason:rule-ham\n*****\n",36);
memcpy(logbuf+logcount ,"\niftransfer:Y\nreason:rule-ham\0",30);
logcount+=30;
break;
default: NULL;
}
//
close(fdlog);
free(time2);
time2=NULL;
return logbuf;
}
void sendlog(logfile)
{
struct sockaddr_in servaddr;
int addrlen;
char mesg;
int port;
int clientsock;
int ret=0;
int sock_buf_size=SOCKBUFFSIZE;
clientsock=socket(AF_INET,SOCK_STREAM,0);
ret = setsockopt( clientsock, SOL_SOCKET, SO_SNDBUF,(char *)&sock_buf_size, sizeof(sock_buf_size) );
ret = setsockopt( clientsock, SOL_SOCKET, SO_RCVBUF,(char *)&sock_buf_size, sizeof(sock_buf_size) );
memset( &servaddr, 0, sizeof(servaddr) );
servaddr.sin_family = AF_INET;
port=2501;
servaddr.sin_port = htons( port );
if (inet_pton( AF_INET, "127.0.0.1", &servaddr.sin_addr )<=0)
printf("inet_pton error\n");
addrlen = sizeof( servaddr );
///printf("now connect!\n");
if (connect(clientsock,(struct sockaddr *)&servaddr,sizeof(servaddr))==0)
{
printf("connect success!\n");
}
///printf("connect over!\n");
/// printf("the mail is \n%s\n",data1);
///sleep(1);
sendagain:
/// if ( 0 < sendto( clientsock, data1, strlen(data1), 0, (struct sockaddr*)&servaddr, addrlen ) )
write(clientsock,logfile,strlen(logfile));
printf("connect over!\n");
{
//if(0>= recvfrom(clientsock, &mesg, 1, 0, (struct sockaddr*)&servaddr, &addrlen))
//{
// printf("recv error\n");
// }
printf("sendto ok!\n");
}
//else
// {
// printf("send mail error\n");
// goto sendagain;
// }
close(clientsock);
}
void daemonPro(int *daemon_num)
{
char * data2=NULL;
char *logfile=NULL;
int num = *daemon_num;
int result = 0 ;
unsigned long packet_id = 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -