⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 netflow2mysql.c

📁 c语言实现的将Netflow数据直接导入Mysql数据库的工具
💻 C
📖 第 1 页 / 共 2 页
字号:
unsigned long long insert_Header_for_NetFlow_Version_5(	struct netflow_v5_header *ph,	in_addr_t *p_s_addr){	char buffer[1024];	if(store_exporter_address){		sprintf(			buffer,			SQL_INSERT_HEADER_WITH_EXPORTER_ADDRESS,			(unsigned short)ntohs(ph->version),			(unsigned long)ntohl(ph->SysUptime),			(unsigned long)ntohl(ph->unix_secs),			(unsigned long)ntohl(ph->flow_sequence),			(unsigned long)			((((unsigned char)(ph->engine_type)) << 8) + (unsigned char)(ph->engine_id)),			(unsigned long)ntohl(*p_s_addr)		);	}else{		sprintf(			buffer,			SQL_INSERT_HEADER,			(unsigned short)ntohs(ph->version),			(unsigned long)ntohl(ph->SysUptime),			(unsigned long)ntohl(ph->unix_secs),			(unsigned long)ntohl(ph->flow_sequence),			(unsigned long)			((((unsigned char)(ph->engine_type)) << 8) + (unsigned char)(ph->engine_id))		);	}		if(debug){		printf("SQL: %s\n", buffer);	}	if(mysql_real_query(sql->handler, buffer, strlen(buffer))){		fprintf(stderr, "mysql_real_query() failed\n");		fprintf(stderr, "%s\n", mysql_error(sql->handler));	}	return mysql_insert_id(sql->handler);}unsigned long packet_sampling(	unsigned long this_packet_count,	unsigned long in_byte,	unsigned long *out_byte){	unsigned long long prev_packet_count;	unsigned long sampled_packet_count;		prev_packet_count = packet_count;	packet_count += this_packet_count;/*	printf("//%llu/%lu/%lu/%llu//",		packet_count,		this_packet_count, 		packet_sampling_rate,		(packet_count / packet_sampling_rate) - 				(prev_packet_count / packet_sampling_rate)								);				*/	if(packet_sampling_rate == 1){		sampled_packet_count = packet_count - prev_packet_count;		*out_byte =	in_byte;					}else{		sampled_packet_count = 			(packet_count / packet_sampling_rate) -				(prev_packet_count / packet_sampling_rate);		*out_byte =	(int)(		(double)in_byte * (double)sampled_packet_count / (double)this_packet_count);					}					return(sampled_packet_count);}unsigned long insert_Record_for_NetFlow_Version_5(	struct netflow_v5_record *r,	unsigned long long *p_hid){	char buffer[1024];	unsigned long sampled_packet_count;	unsigned long sampled_byte_count;		sampled_packet_count = packet_sampling(		ntohl(r->dPkts),		ntohl(r->dOctets),		&sampled_byte_count	);		if(sampled_packet_count == 0){		return(0);		}		sprintf(		buffer,		SQL_INSERT_IPV4_RECORD,		(unsigned long long)	*p_hid,		(unsigned char)			4,		(unsigned long)			ntohl(r->Last),		(unsigned long)			ntohl(r->First),		(unsigned long)			sampled_byte_count,		(unsigned long)			sampled_packet_count,		(unsigned short)		ntohs(r->input),		(unsigned short)		ntohs(r->output),		(unsigned long)			ntohl(r->srcaddr),		(unsigned long)			ntohl(r->dstaddr),		(unsigned char)			r->prot,		(unsigned char)			r->tos,		(unsigned short)		ntohs(r->srcport),		(unsigned short)		ntohs(r->dstport),		(unsigned long)			ntohl(r->nexthop),		(unsigned short)		ntohs(r->dst_as),		(unsigned short)		ntohs(r->src_as),		(unsigned char)			r->dst_mask,		(unsigned char)			r->src_mask,		(unsigned char)			r->tcp_flags	);	if(debug){		printf("SQL: %s\n", buffer);	}	if(mysql_real_query(sql->handler, buffer, strlen(buffer))){		fprintf(stderr, "mysql_real_query() failed\n");		fprintf(stderr, "%s\n", mysql_error(sql->handler));	}	return(1);}int _insert_Record_for_NetFlow_Version_9(	unsigned char *buf,	struct template *record,	int is_ipv6_record){	int i = 0;	char *bind_map;		if(sql->ipv4_bind_map[record->record.Type] == -1){		return(0);		}	if(record->record.Length == 0 || record->record.Length > 16){		return(0);	}		if(is_ipv6_record){		bind_map = sql->ipv6_bind_map;	}else{		bind_map = sql->ipv4_bind_map;	}	if(record->record.Length > 8){ // if the containing data does not fit to 		                           // unsigned char, unsigned short,		                           // unsigned long or unsigned long long.		for(i = 0; i < record->record.Length; i++){									// the data will be stored in network byte order			sql->record_buffer[(unsigned char)bind_map[record->record.Type]][i]				= buf[i];		}		return(1);	}		// when storeing data as unsigned char, unsigned short, unsigned long or 	// unsigned longlong, we store the data as a N bit integer and we have to 	// aware of the byte order.	for(i = 0; i < record->record.Length; i++){		if(is_big_endian()){			sql->record_buffer[(unsigned char)bind_map[record->record.Type]][i]				= buf[i];		}else{			sql->record_buffer[(unsigned char)bind_map[record->record.Type]][i]				= buf[record->record.Length - i - 1];		}	}	return(1);}int insert_Record_for_NetFlow_Version_9(	char *buf,	int *pos,	struct template *record,	int is_ipv6_record,	unsigned long long *p_hid){	int i = 0;	unsigned long sampled_packet_count;	unsigned long sampled_byte_count;		char buffer[1024];	char buffer_for_ipv6_address[3][16*2+1];	memset(sql->record_buffer, '\0', sizeof(sql->record_buffer[19][16]));		while(record != NULL){		i += _insert_Record_for_NetFlow_Version_9(&buf[*pos], record, is_ipv6_record);		*pos += record->record.Length;		record = record->next;	}	if(i == 0){		return(0);	}		sampled_packet_count = packet_sampling(		*(unsigned long *)sql->record_buffer[4],		*(unsigned long *)sql->record_buffer[3],		&sampled_byte_count	);		if(sampled_packet_count == 0){		return(0);		}		if(is_ipv6_record){		mysql_real_escape_string(			sql->handler, buffer_for_ipv6_address[0],			sql->record_buffer[7],			16		);		mysql_real_escape_string(			sql->handler,				buffer_for_ipv6_address[1],			sql->record_buffer[8],			16		);		mysql_real_escape_string(			sql->handler,			buffer_for_ipv6_address[2],			sql->record_buffer[13],			16		);			sprintf(			buffer,			SQL_INSERT_IPV6_RECORD,			(unsigned long long)	*p_hid,			(unsigned char)			6,			*(unsigned long *)		sql->record_buffer[1],			*(unsigned long *)		sql->record_buffer[2],			(unsigned long)			sampled_byte_count,			(unsigned long)			sampled_packet_count,			*(unsigned short *)		sql->record_buffer[5],			*(unsigned short *)		sql->record_buffer[6],			(char *)				buffer_for_ipv6_address[0],			(char *)				buffer_for_ipv6_address[1],			*(unsigned char *)		sql->record_buffer[9],			*(unsigned char *)		sql->record_buffer[10],			*(unsigned short *)		sql->record_buffer[11],			*(unsigned short *)		sql->record_buffer[12],			(char *)				buffer_for_ipv6_address[2],			*(unsigned short *)		sql->record_buffer[14],			*(unsigned short *)		sql->record_buffer[15],			*(unsigned char *)		sql->record_buffer[16],			*(unsigned char *)		sql->record_buffer[17],			*(unsigned char *)		sql->record_buffer[18]		);	}else{		sprintf(			buffer,			SQL_INSERT_IPV4_RECORD,			(unsigned long long)	*p_hid,			(unsigned char)			4,			*(unsigned long *)		sql->record_buffer[1],			*(unsigned long *)		sql->record_buffer[2],			(unsigned long)			sampled_byte_count,			(unsigned long)			sampled_packet_count,			*(unsigned short *)		sql->record_buffer[5],			*(unsigned short *)		sql->record_buffer[6],			*(unsigned long *)		sql->record_buffer[7],			*(unsigned long *)		sql->record_buffer[8],			*(unsigned char *)		sql->record_buffer[9],			*(unsigned char *)		sql->record_buffer[10],			*(unsigned short *)		sql->record_buffer[11],			*(unsigned short *)		sql->record_buffer[12],			*(unsigned long *)		sql->record_buffer[13],			*(unsigned short *)		sql->record_buffer[14],			*(unsigned short *)		sql->record_buffer[15],			*(unsigned char *)		sql->record_buffer[16],			*(unsigned char *)		sql->record_buffer[17],			*(unsigned char *)		sql->record_buffer[18]		);	}	if(debug){		printf("SQL: %s\n", buffer);	}	if(mysql_real_query(sql->handler, buffer, strlen(buffer))){		fprintf(stderr, "mysql_real_query() failed\n");		fprintf(stderr, "%s\n", mysql_error(sql->handler));	}	return(1);}int handle_Data_FlowSet(	unsigned char *buf,	u_int16_t Template_ID,	struct NetFlow_Version_9_Packet_Header *ph,	int *p_insert_Header,	unsigned long long *p_hid,	in_addr_t *p_s_addr){	struct template *record;	int pos;		pos = 0;		record = lookup_template(p_s_addr, Template_ID);		if(record->record.Type == 0){		fprintf(stderr, "WARN: \t\tUnknown Template: %hu\n", Template_ID);		return(0);	}	if(*p_insert_Header == 0){		*p_hid = insert_Header_for_NetFlow_Version_9(ph, p_s_addr);		*p_insert_Header = 1;	}	insert_Record_for_NetFlow_Version_9(buf, &pos, record,			is_ipv6_template(p_s_addr, Template_ID), p_hid);	return(pos);}int fill_padding(	int pos){	if(pos % 4 != 0){		return(4 - (pos % 4));	}	return(0);}void handle_NetFlow_v5_packet(	char *buf,	int length,	in_addr_t *p_s_addr){	int i;	int num_of_records;	struct netflow_v5_header *h;	struct netflow_v5_record *r;	unsigned long long hid; 			if((length - sizeof(struct netflow_v5_header)) % sizeof(struct netflow_v5_record) != 0){		perror("WARN: 1 Malformed NetFlow v5 packet.\n");		return;	}		num_of_records = (length - sizeof(struct netflow_v5_header)) / sizeof(struct netflow_v5_record);		h = (struct netflow_v5_header *)buf;	if(ntohs(h->count) != num_of_records){		perror("WARN: 2 Malformed NetFlow v5 packet.\n");		return;	}				hid = insert_Header_for_NetFlow_Version_5(h, p_s_addr);	for(i = 0; i < num_of_records; i++){		r = (struct netflow_v5_record *)&buf[sizeof(struct netflow_v5_header) + sizeof(struct netflow_v5_record) * i];		insert_Record_for_NetFlow_Version_5(r, &hid);	}}void handle_NetFlow_v9_packet(	char *buf,	int length,	in_addr_t *p_s_addr){	int pos;	int pos2;	int rv;	int insert_Header;	unsigned long long hid;	struct NetFlow_Version_9_Packet_Header *ph;	struct NetFlow_Version_9_FlowSet_Header *fh;		pos = 0;	insert_Header = 0;	hid = 0;		ph = (struct NetFlow_Version_9_Packet_Header *)buf;	if(debug){		printf("\tVersion:\t%hu\n", ntohs(ph->Version));		printf("\tPacket_Count:\t%hu\n", ntohs(ph->Count));		printf("\tSystem_Uptime:\t%lu\n", (unsigned long)ntohl(ph->System_Uptime));		printf("\tUNIX_Seconds:\t%lu\n", (unsigned long)ntohl(ph->UNIX_Seconds));		printf("\tPackage_Sequence:\t%lu\n", (unsigned long)ntohl(ph->Package_Sequence));		printf("\tSource_ID:\t%lu\n", (unsigned long)ntohl(ph->Source_ID));		printf("\tExporter_Address:\t%s(%lu)\n\n", 			inet_ntoa(*(struct in_addr *)(p_s_addr)),			(unsigned long)ntohl(*p_s_addr)		);	}		pos += sizeof(struct NetFlow_Version_9_Packet_Header);			while(pos < length){		fh = (struct NetFlow_Version_9_FlowSet_Header *)&buf[pos];		if(debug){			printf("\tFlowSet ID:%hu ", ntohs(fh->FlowSet_ID));		}		pos += sizeof(struct NetFlow_Version_9_FlowSet_Header);		pos2 = sizeof(struct NetFlow_Version_9_FlowSet_Header);		while(pos2 + fill_padding(pos2) < ntohs(fh->Length)){						if((ntohs(fh->FlowSet_ID) & 256) == 256){				if(debug){printf("\tData FlowSet\n");}				rv = handle_Data_FlowSet(					&buf[pos],					ntohs(fh->FlowSet_ID),					ph,					&insert_Header,					&hid,					p_s_addr				);			}else if(ntohs(fh->FlowSet_ID) == 0){				if(debug){printf("\tTemplate FlowSet\n");}				//Template FlowSet				rv = (handle_Template_FlowSet(					&buf[pos],					ph,					p_s_addr				));			}else if(ntohs(fh->FlowSet_ID) == 1){				if(debug){printf("\tOptions FlowSet\n");}				//Options FlowSet				rv = (handle_Options_FlowSet(					&buf[pos],					ph,					p_s_addr				));			}						if(rv == 0){				rv = ntohs(fh->Length) - sizeof(struct NetFlow_Version_9_FlowSet_Header);				if(rv == 0){					return;				}			}			pos += rv;			pos2 += rv;			if(debug){printf("\n");}		}		pos += fill_padding(pos);	}}void handle_NetFlow_packet(	char *buf,	int length,	struct in_addr *p_in_addr){	u_int16_t *v;	v = (u_int16_t *)buf;	switch (ntohs(*v)) {		case 9:			if(debug){				printf("NetFlow v%hu Packet from %s\n", ntohs(*v), inet_ntoa(*p_in_addr));			}			handle_NetFlow_v9_packet(buf, length, &(p_in_addr->s_addr));			break;		case 5:			if(debug){				printf("NetFlow v%hu Packet from %s\n", ntohs(*v), inet_ntoa(*p_in_addr));			}			handle_NetFlow_v5_packet(buf, length, &(p_in_addr->s_addr));			break;		default:			fprintf(stderr, "WARN: Unsuported NetFlow Packet from %s.\n", inet_ntoa(*p_in_addr));			break;	}}intmain(	int argc,	char *argv[]){	int s;	int i;	int n;	int mtu;	char *buf;	struct sockaddr_in from;	int from_len = 0;		memset(&template_lookup, '\0', sizeof(struct hash8));	s = create_socket(argc, argv, &mtu, &buf);	connect_mysql(argc, argv);		setvbuf(stdout, NULL, _IONBF, 0);		while(1){		n = recvfrom(s, buf, mtu, 0, (struct sockaddr *)&from, &from_len);		handle_NetFlow_packet(buf, n, &(from.sin_addr));	}		for(i = 1; i < argc; i++){		memset(argv[i], '\0', strlen(argv[i]));		argv[i] = "\0";	}		free(buf);	exit(0);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -