⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 netflow2mysql.c

📁 c语言实现的将Netflow数据直接导入Mysql数据库的工具
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * Copyright (C) 2003-2004 WIDE Project. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright *    notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright *    notice, this list of conditions and the following disclaimer in the *    documentation and/or other materials provided with the distribution. * 3. Neither the name of the project nor the names of its contributors *    may be used to endorse or promote products derived from this software *    without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE PROJECT AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED.  IN NO EVENT SHALL THE PROJECT OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * */#include "NetFlow2MySQL.h"int is_big_endian(){	long one = 1;	return !(*((char *)(&one)));}void connect_mysql (	int argc,	char *argv[]){	char c;	int index = 0;	u_int16_t mysql_port;	char mysql_host[256], mysql_user[256], mysql_password[256], mysql_database[256];	int disable_ipv6 = 0;	static struct sql_gadget sql_gadget;				static struct option long_options[] = {		{"port", required_argument, 0, 'p'},		{"interface", required_argument, 0, 'i'},		{"receive-buffer-size", required_argument, 0, 'r'},		{"debug", no_argument, 0, 'd'},		{"mysql-host", required_argument, 0, 'h'},		{"mysql-user", required_argument, 0, 'u'},		{"mysql-password", required_argument, 0, 'P'},		{"mysql-database", required_argument, 0, '1'},		{"mysql-port", required_argument, 0, '2'},		{"packet-sampling-rate", required_argument, 0, '3'},		{"disable-ipv6", no_argument, 0, '9'},		{"store-exporter-address", no_argument, 0, '8'},		{"version", no_argument, 0, 'v'},		{0,0,0,0}	};		strcpy(mysql_host, "localhost");	memset(mysql_user, '\0', sizeof(mysql_user));	memset(mysql_password, '\0', sizeof(mysql_password));	strcpy(mysql_database, "flow");	mysql_port = 0;		memset(&sql_gadget, '\0', sizeof(struct sql_gadget));		optarg = NULL;	optind = optopt = 0;	while ((c = getopt_long(		argc,		argv,		"h:u:P:1:2:89",		long_options,		&index		)) != -1)		switch (c)	{		case 'h':			strncpy(mysql_host, optarg, 256);			break;		case 'u':			strncpy(mysql_user, optarg, 256);			break;		case 'P':			strncpy(mysql_password, optarg, 256);			break;		case '1':			strncpy(mysql_database, optarg, 256);			break;		case '2':			mysql_port = atoi(optarg);			break;		case '8':			store_exporter_address = 1;			break;		case '9':			disable_ipv6 = 1;			break;		default:			break;	}	while(mysql_user[0] == '\0'){		printf("Enter username for MySQL: ");		fgets(mysql_user, sizeof(mysql_user), stdin);		mysql_user[strlen(mysql_user) - 1] = '\0';	}	while(mysql_password[0] == '\0'){		printf("Enter password for MySQL: ");		system("stty -echo");		fgets(mysql_password, sizeof(mysql_password), stdin);		system("stty echo");		printf("\n");		mysql_password[strlen(mysql_password) - 1] = '\0';	}		sql_gadget.handler = mysql_init(sql_gadget.handler);	if(!mysql_real_connect(		sql_gadget.handler,		mysql_host,		mysql_user,		mysql_password,		mysql_database,		mysql_port,		NULL,		0	)){		fprintf(stderr, "Failed to connect to database: Error: %s\n",			mysql_error(sql_gadget.handler));		exit(1);	}		memset(sql_gadget.ipv4_bind_map, -1, sizeof(sql_gadget.ipv4_bind_map[65536]));	sql_gadget.ipv4_bind_map[LAST_SWITCHED] = 1;	sql_gadget.ipv4_bind_map[FIRST_SWITCHED] = 2;	sql_gadget.ipv4_bind_map[BYTES_32] = 3;	sql_gadget.ipv4_bind_map[PKTS_32] = 4;	sql_gadget.ipv4_bind_map[INPUT_SNMP] = 5;	sql_gadget.ipv4_bind_map[OUTPUT_SNMP] = 6;	sql_gadget.ipv4_bind_map[IPV4_SRC_ADDR] = 7;	sql_gadget.ipv4_bind_map[IPV4_DST_ADDR] = 8;	sql_gadget.ipv4_bind_map[PROT] = 9;	sql_gadget.ipv4_bind_map[SRC_TOS] = 10;	sql_gadget.ipv4_bind_map[L4_SRC_PORT] = 11;	sql_gadget.ipv4_bind_map[L4_DST_PORT] = 12;	sql_gadget.ipv4_bind_map[IPV4_NEXT_HOP] = 13;	sql_gadget.ipv4_bind_map[DST_AS] = 14;	sql_gadget.ipv4_bind_map[SRC_AS] = 15;	sql_gadget.ipv4_bind_map[DST_MASK] = 16;	sql_gadget.ipv4_bind_map[SRC_MASK] = 17;	sql_gadget.ipv4_bind_map[TCP_FLAGS] = 18;	memset(sql_gadget.ipv6_bind_map, -1, sizeof(sql_gadget.ipv6_bind_map[65536]));	sql_gadget.ipv6_bind_map[LAST_SWITCHED] = 1;	sql_gadget.ipv6_bind_map[FIRST_SWITCHED] = 2;	sql_gadget.ipv6_bind_map[BYTES_32] = 3;	sql_gadget.ipv6_bind_map[PKTS_32] = 4;	sql_gadget.ipv6_bind_map[INPUT_SNMP] = 5;	sql_gadget.ipv6_bind_map[OUTPUT_SNMP] = 6;	sql_gadget.ipv6_bind_map[IPV6_SRC_ADDR] = 7;	sql_gadget.ipv6_bind_map[IPV6_DST_ADDR] = 8;	sql_gadget.ipv6_bind_map[PROT] = 9;	sql_gadget.ipv6_bind_map[SRC_TOS] = 10;	sql_gadget.ipv6_bind_map[L4_SRC_PORT] = 11;	sql_gadget.ipv6_bind_map[L4_DST_PORT] = 12;	sql_gadget.ipv6_bind_map[IPV6_NEXT_HOP] = 13;	sql_gadget.ipv6_bind_map[DST_AS] = 14;	sql_gadget.ipv6_bind_map[SRC_AS] = 15;	sql_gadget.ipv6_bind_map[DST_MASK] = 16;	sql_gadget.ipv6_bind_map[SRC_MASK] = 17;	sql_gadget.ipv6_bind_map[TCP_FLAGS] = 18;	sql = &sql_gadget;		return;}int create_socket(	int argc,	char *argv[],	int *mtu,	char **buf){	struct sockaddr_in receiver;	u_int16_t port ;	char c, interface[16];	struct ifreq ifreq;	int s;	int i;	int index;	int pref_so_rcvbuf;	int so_rcvbuf;	int so_rcvbuf_len;	int try;	static struct option long_options[] = {		{"port", required_argument, 0, 'p'},		{"interface", required_argument, 0, 'i'},		{"receive-buffer-size", required_argument, 0, 'r'},		{"debug", no_argument, 0, 'd'},		{"mysql-host", required_argument, 0, 'h'},		{"mysql-user", required_argument, 0, 'u'},		{"mysql-password", required_argument, 0, 'P'},		{"mysql-database", required_argument, 0, '1'},		{"mysql-port", required_argument, 0, '2'},		{"packet-sampling-rate", required_argument, 0, '3'},		{"disable-ipv6", required_argument, 0, '9'},		{"store-exporter-address", no_argument, 0, '8'},		{"version", no_argument, 0, 'v'},		{0,0,0,0}	};	i = 0;	pref_so_rcvbuf = 10485760;	optind = optopt = opterr = 0;	while ((c = getopt_long(		argc,		argv,		"i:p:vd",		long_options,		&index		)) != -1)		switch (c)	{		case 'i':			strncpy(interface, optarg, 16);			i++;		    break;	  	case 'p':			port = atoi(optarg);			i++;		    break;	  	case '3':			packet_sampling_rate = atoi(optarg);		    break;	  	case 'r':			pref_so_rcvbuf = atoi(optarg);		    break;	  	case 'd':			debug = 1;		    break;		case 'v':			printf("%s version 0.24.\nsee NetFlow2MySQL.html for more detail.\n", argv[0]);			exit(0);			break;		default:			break;	}	if(packet_sampling_rate == 0){		packet_sampling_rate = 1;	}	if(i != 2){		perror(ARGUMENTS);		exit(-1);	}	memset(&ifreq, '\0', sizeof(ifreq));	strncpy(ifreq.ifr_name, interface, sizeof(interface));	if((s = socket(PF_INET, SOCK_DGRAM, 0)) == -1){		perror("Cannot create socket.\n");		exit(-1);	}	if(ioctl(s, SIOCGIFMTU, &ifreq) == -1){		perror("Cannot get the interface MTU.\n");		exit(-1);	}	*mtu = ifreq.ifr_mtu;	*buf = malloc(*mtu);		if(ioctl(s, SIOCGIFADDR, &ifreq) == -1){		perror("Cannot get the interface address.\n");		exit(-1);	}	getsockopt(s, SOL_SOCKET, SO_RCVBUF, &so_rcvbuf, &so_rcvbuf_len);	if(pref_so_rcvbuf != 0){		try = 0;		printf("INFO: Trying to set SO_RCVBUF %d to %d.\n", so_rcvbuf, pref_so_rcvbuf);		while(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &pref_so_rcvbuf, 4) != 0 && try < 30){			pref_so_rcvbuf /= 2;			try ++;			printf("INFO: Trying to set SO_RCVBUF %d to %d.\n", so_rcvbuf, pref_so_rcvbuf);		}	}	getsockopt(s, SOL_SOCKET, SO_RCVBUF, &so_rcvbuf, &so_rcvbuf_len);	printf("INFO: SO_RCVBUF: %d\n", so_rcvbuf);	memset(&receiver, 0, sizeof(receiver));		receiver = *(struct sockaddr_in *)&ifreq.ifr_addr;	fprintf(stderr, "INFO: Listening: %s:%hu\n", inet_ntoa(receiver.sin_addr), port);	receiver.sin_port = htons(port);		if( (bind(s, (struct sockaddr *)&receiver, sizeof(receiver))) == -1){		perror("ERROR: Cannot bind socket.\n");		exit(-1);	}			return(s);}struct template_base *_lookup_template(	in_addr_t *p_s_addr,	u_int16_t Template_ID){	struct hash8 **p;	p = &template_lookup.next[(u_int8_t)(*p_s_addr >>24)];	if(*p == NULL){		*p = malloc(sizeof(struct hash8));		memset(*p, '\0', sizeof(struct hash8));	}	p = &((*p)->next[(u_int8_t)(*p_s_addr >>16)]);	if(*p == NULL){		*p = malloc(sizeof(struct hash8));		memset(*p, '\0', sizeof(struct hash8));	}	p = &((*p)->next[(u_int8_t)(*p_s_addr >>8)]);	if(*p == NULL){		*p = malloc(sizeof(struct hash8));		memset(*p, '\0', sizeof(struct hash8));	}	p = &((*p)->next[(u_int8_t)(*p_s_addr)]);	if(*p == NULL){		*p = malloc(sizeof(struct hash8));		memset(*p, '\0', sizeof(struct hash8));	}	p = &((*p)->next[(u_int8_t)(Template_ID >> 8)]);	if(*p == NULL){		*p = malloc(sizeof(struct hash8));		memset(*p, '\0', sizeof(struct hash8));	}	p = &((*p)->next[(u_int8_t)(Template_ID)]);	if(*p == NULL){		*p = malloc(sizeof(struct template_base));		memset(*p, '\0', sizeof(struct template_base));	}		return((struct template_base *)*p);}struct template *lookup_template(	in_addr_t *p_s_addr,	u_int16_t Template_ID){	struct template_base *template_base;	template_base = _lookup_template(p_s_addr, Template_ID);	if(template_base->template == NULL){		template_base->template = malloc(sizeof(struct template));		memset(template_base->template, '\0', sizeof(struct template));		}	return(template_base->template);}int handle_Options_FlowSet(	char *buf,	struct NetFlow_Version_9_Packet_Header *ph,	in_addr_t *p_s_addr){	struct NetFlow_Version_9_Options_Template_Header *oth;	struct NetFlow_Version_9_FlowSet_Record *fr;	struct template *record;	int pos;	int i;		oth = (struct NetFlow_Version_9_Options_Template_Header *)buf;	pos = sizeof(struct NetFlow_Version_9_Options_Template_Header);	record = lookup_template(p_s_addr, ntohs(oth->Template_ID));	if(debug){		printf("\t\tOptions Template ID %hu has %hu scope and %hu option records\n",			ntohs(oth->Template_ID),			ntohs(oth->Options_Scope_Length) / 4,			ntohs(oth->Options_Length) / 4		);	}	for(i = 0;		i < ((ntohs(oth->Options_Scope_Length) + (ntohs(oth->Options_Length))) / 4);		i++){		fr = (struct NetFlow_Version_9_FlowSet_Record *)&buf[pos];		if(i > 0){			record->next = malloc(sizeof(struct template));			record = record->next;		}		if(i < (ntohs(oth->Options_Scope_Length) / 4)){			record->is_scope_field = 1;		}else{			record->is_scope_field = 0;		}		record->record.Type = ntohs(fr->Type);		record->record.Length = ntohs(fr->Length);		record->next = NULL;		pos += sizeof(struct NetFlow_Version_9_FlowSet_Record);	}	record = lookup_template(p_s_addr, ntohs(oth->Template_ID));	return(pos);}void set_ipv6_template (	in_addr_t *p_s_addr,	u_int16_t Template_ID){	struct template_base *template_base;	template_base = _lookup_template(p_s_addr, Template_ID);	template_base->is_ipv6 = 1;}unsigned char is_ipv6_template (	in_addr_t *p_s_addr,	u_int16_t Template_ID){	struct template_base *template_base;	template_base = _lookup_template(p_s_addr, Template_ID);	return template_base->is_ipv6;}int handle_Template_FlowSet(	char *buf,	struct NetFlow_Version_9_Packet_Header *ph,	in_addr_t *p_s_addr){	struct NetFlow_Version_9_Template_Header *th;	struct NetFlow_Version_9_FlowSet_Record *fr;	struct template *record;	int pos;	int i;		th = (struct NetFlow_Version_9_Template_Header *)buf;	pos = sizeof(struct NetFlow_Version_9_Template_Header);	record = lookup_template(p_s_addr, ntohs(th->Template_ID));	if(debug){		printf("\t\tTemplate ID %hu has %hu records\n", ntohs(th->Template_ID), ntohs(th->Field_Count));	}	for(i = 0; i < ntohs(th->Field_Count); i++){		fr = (struct NetFlow_Version_9_FlowSet_Record *)&buf[pos];		if(i > 0){			record->next = malloc(sizeof(struct template));			record = record->next;		}		record->is_scope_field = 0;				record->record.Type = ntohs(fr->Type);		record->record.Length = ntohs(fr->Length);		if(record->record.Type == IPV6_SRC_ADDR){			set_ipv6_template(p_s_addr, ntohs(th->Template_ID));		}						record->next = NULL;		pos += sizeof(struct NetFlow_Version_9_FlowSet_Record);	}	record = lookup_template(p_s_addr, ntohs(th->Template_ID));	return(pos);}u_int64_t insert_Header_for_NetFlow_Version_9(	struct NetFlow_Version_9_Packet_Header *ph,	in_addr_t *p_s_addr){	char buffer[1024];	if(store_exporter_address){		sprintf(			buffer,			SQL_INSERT_HEADER_WITH_EXPORTER_ADDRESS,			(unsigned short)ntohs(ph->Version),			(unsigned long)ntohl(ph->System_Uptime),			(unsigned long)ntohl(ph->UNIX_Seconds),			(unsigned long)ntohl(ph->Package_Sequence),			(unsigned long)ntohl(ph->Source_ID),			(unsigned long)ntohl(*p_s_addr)		);			}else{		sprintf(			buffer,			SQL_INSERT_HEADER,			(unsigned short)ntohs(ph->Version),			(unsigned long)ntohl(ph->System_Uptime),			(unsigned long)ntohl(ph->UNIX_Seconds),			(unsigned long)ntohl(ph->Package_Sequence),			(unsigned long)ntohl(ph->Source_ID)		);	}	if(debug){		printf("SQL: %s\n", buffer);	}	if(mysql_real_query(sql->handler, buffer, strlen(buffer))){		fprintf(stderr, "mysql_real_query() failed\n");		fprintf(stderr, "%s\n", mysql_error(sql->handler));	}	return mysql_insert_id(sql->handler);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -