📄 netflow2mysql.c
字号:
/* * Copyright (C) 2003-2004 WIDE Project. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of the project nor the names of its contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE PROJECT AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE PROJECT OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * */#include "NetFlow2MySQL.h"int is_big_endian(){ long one = 1; return !(*((char *)(&one)));}void connect_mysql ( int argc, char *argv[]){ char c; int index = 0; u_int16_t mysql_port; char mysql_host[256], mysql_user[256], mysql_password[256], mysql_database[256]; int disable_ipv6 = 0; static struct sql_gadget sql_gadget; static struct option long_options[] = { {"port", required_argument, 0, 'p'}, {"interface", required_argument, 0, 'i'}, {"receive-buffer-size", required_argument, 0, 'r'}, {"debug", no_argument, 0, 'd'}, {"mysql-host", required_argument, 0, 'h'}, {"mysql-user", required_argument, 0, 'u'}, {"mysql-password", required_argument, 0, 'P'}, {"mysql-database", required_argument, 0, '1'}, {"mysql-port", required_argument, 0, '2'}, {"packet-sampling-rate", required_argument, 0, '3'}, {"disable-ipv6", no_argument, 0, '9'}, {"store-exporter-address", no_argument, 0, '8'}, {"version", no_argument, 0, 'v'}, {0,0,0,0} }; strcpy(mysql_host, "localhost"); memset(mysql_user, '\0', sizeof(mysql_user)); memset(mysql_password, '\0', sizeof(mysql_password)); strcpy(mysql_database, "flow"); mysql_port = 0; memset(&sql_gadget, '\0', sizeof(struct sql_gadget)); optarg = NULL; optind = optopt = 0; while ((c = getopt_long( argc, argv, "h:u:P:1:2:89", long_options, &index )) != -1) switch (c) { case 'h': strncpy(mysql_host, optarg, 256); break; case 'u': strncpy(mysql_user, optarg, 256); break; case 'P': strncpy(mysql_password, optarg, 256); break; case '1': strncpy(mysql_database, optarg, 256); break; case '2': mysql_port = atoi(optarg); break; case '8': store_exporter_address = 1; break; case '9': disable_ipv6 = 1; break; default: break; } while(mysql_user[0] == '\0'){ printf("Enter username for MySQL: "); fgets(mysql_user, sizeof(mysql_user), stdin); mysql_user[strlen(mysql_user) - 1] = '\0'; } while(mysql_password[0] == '\0'){ printf("Enter password for MySQL: "); system("stty -echo"); fgets(mysql_password, sizeof(mysql_password), stdin); system("stty echo"); printf("\n"); mysql_password[strlen(mysql_password) - 1] = '\0'; } sql_gadget.handler = mysql_init(sql_gadget.handler); if(!mysql_real_connect( sql_gadget.handler, mysql_host, mysql_user, mysql_password, mysql_database, mysql_port, NULL, 0 )){ fprintf(stderr, "Failed to connect to database: Error: %s\n", mysql_error(sql_gadget.handler)); exit(1); } memset(sql_gadget.ipv4_bind_map, -1, sizeof(sql_gadget.ipv4_bind_map[65536])); sql_gadget.ipv4_bind_map[LAST_SWITCHED] = 1; sql_gadget.ipv4_bind_map[FIRST_SWITCHED] = 2; sql_gadget.ipv4_bind_map[BYTES_32] = 3; sql_gadget.ipv4_bind_map[PKTS_32] = 4; sql_gadget.ipv4_bind_map[INPUT_SNMP] = 5; sql_gadget.ipv4_bind_map[OUTPUT_SNMP] = 6; sql_gadget.ipv4_bind_map[IPV4_SRC_ADDR] = 7; sql_gadget.ipv4_bind_map[IPV4_DST_ADDR] = 8; sql_gadget.ipv4_bind_map[PROT] = 9; sql_gadget.ipv4_bind_map[SRC_TOS] = 10; sql_gadget.ipv4_bind_map[L4_SRC_PORT] = 11; sql_gadget.ipv4_bind_map[L4_DST_PORT] = 12; sql_gadget.ipv4_bind_map[IPV4_NEXT_HOP] = 13; sql_gadget.ipv4_bind_map[DST_AS] = 14; sql_gadget.ipv4_bind_map[SRC_AS] = 15; sql_gadget.ipv4_bind_map[DST_MASK] = 16; sql_gadget.ipv4_bind_map[SRC_MASK] = 17; sql_gadget.ipv4_bind_map[TCP_FLAGS] = 18; memset(sql_gadget.ipv6_bind_map, -1, sizeof(sql_gadget.ipv6_bind_map[65536])); sql_gadget.ipv6_bind_map[LAST_SWITCHED] = 1; sql_gadget.ipv6_bind_map[FIRST_SWITCHED] = 2; sql_gadget.ipv6_bind_map[BYTES_32] = 3; sql_gadget.ipv6_bind_map[PKTS_32] = 4; sql_gadget.ipv6_bind_map[INPUT_SNMP] = 5; sql_gadget.ipv6_bind_map[OUTPUT_SNMP] = 6; sql_gadget.ipv6_bind_map[IPV6_SRC_ADDR] = 7; sql_gadget.ipv6_bind_map[IPV6_DST_ADDR] = 8; sql_gadget.ipv6_bind_map[PROT] = 9; sql_gadget.ipv6_bind_map[SRC_TOS] = 10; sql_gadget.ipv6_bind_map[L4_SRC_PORT] = 11; sql_gadget.ipv6_bind_map[L4_DST_PORT] = 12; sql_gadget.ipv6_bind_map[IPV6_NEXT_HOP] = 13; sql_gadget.ipv6_bind_map[DST_AS] = 14; sql_gadget.ipv6_bind_map[SRC_AS] = 15; sql_gadget.ipv6_bind_map[DST_MASK] = 16; sql_gadget.ipv6_bind_map[SRC_MASK] = 17; sql_gadget.ipv6_bind_map[TCP_FLAGS] = 18; sql = &sql_gadget; return;}int create_socket( int argc, char *argv[], int *mtu, char **buf){ struct sockaddr_in receiver; u_int16_t port ; char c, interface[16]; struct ifreq ifreq; int s; int i; int index; int pref_so_rcvbuf; int so_rcvbuf; int so_rcvbuf_len; int try; static struct option long_options[] = { {"port", required_argument, 0, 'p'}, {"interface", required_argument, 0, 'i'}, {"receive-buffer-size", required_argument, 0, 'r'}, {"debug", no_argument, 0, 'd'}, {"mysql-host", required_argument, 0, 'h'}, {"mysql-user", required_argument, 0, 'u'}, {"mysql-password", required_argument, 0, 'P'}, {"mysql-database", required_argument, 0, '1'}, {"mysql-port", required_argument, 0, '2'}, {"packet-sampling-rate", required_argument, 0, '3'}, {"disable-ipv6", required_argument, 0, '9'}, {"store-exporter-address", no_argument, 0, '8'}, {"version", no_argument, 0, 'v'}, {0,0,0,0} }; i = 0; pref_so_rcvbuf = 10485760; optind = optopt = opterr = 0; while ((c = getopt_long( argc, argv, "i:p:vd", long_options, &index )) != -1) switch (c) { case 'i': strncpy(interface, optarg, 16); i++; break; case 'p': port = atoi(optarg); i++; break; case '3': packet_sampling_rate = atoi(optarg); break; case 'r': pref_so_rcvbuf = atoi(optarg); break; case 'd': debug = 1; break; case 'v': printf("%s version 0.24.\nsee NetFlow2MySQL.html for more detail.\n", argv[0]); exit(0); break; default: break; } if(packet_sampling_rate == 0){ packet_sampling_rate = 1; } if(i != 2){ perror(ARGUMENTS); exit(-1); } memset(&ifreq, '\0', sizeof(ifreq)); strncpy(ifreq.ifr_name, interface, sizeof(interface)); if((s = socket(PF_INET, SOCK_DGRAM, 0)) == -1){ perror("Cannot create socket.\n"); exit(-1); } if(ioctl(s, SIOCGIFMTU, &ifreq) == -1){ perror("Cannot get the interface MTU.\n"); exit(-1); } *mtu = ifreq.ifr_mtu; *buf = malloc(*mtu); if(ioctl(s, SIOCGIFADDR, &ifreq) == -1){ perror("Cannot get the interface address.\n"); exit(-1); } getsockopt(s, SOL_SOCKET, SO_RCVBUF, &so_rcvbuf, &so_rcvbuf_len); if(pref_so_rcvbuf != 0){ try = 0; printf("INFO: Trying to set SO_RCVBUF %d to %d.\n", so_rcvbuf, pref_so_rcvbuf); while(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &pref_so_rcvbuf, 4) != 0 && try < 30){ pref_so_rcvbuf /= 2; try ++; printf("INFO: Trying to set SO_RCVBUF %d to %d.\n", so_rcvbuf, pref_so_rcvbuf); } } getsockopt(s, SOL_SOCKET, SO_RCVBUF, &so_rcvbuf, &so_rcvbuf_len); printf("INFO: SO_RCVBUF: %d\n", so_rcvbuf); memset(&receiver, 0, sizeof(receiver)); receiver = *(struct sockaddr_in *)&ifreq.ifr_addr; fprintf(stderr, "INFO: Listening: %s:%hu\n", inet_ntoa(receiver.sin_addr), port); receiver.sin_port = htons(port); if( (bind(s, (struct sockaddr *)&receiver, sizeof(receiver))) == -1){ perror("ERROR: Cannot bind socket.\n"); exit(-1); } return(s);}struct template_base *_lookup_template( in_addr_t *p_s_addr, u_int16_t Template_ID){ struct hash8 **p; p = &template_lookup.next[(u_int8_t)(*p_s_addr >>24)]; if(*p == NULL){ *p = malloc(sizeof(struct hash8)); memset(*p, '\0', sizeof(struct hash8)); } p = &((*p)->next[(u_int8_t)(*p_s_addr >>16)]); if(*p == NULL){ *p = malloc(sizeof(struct hash8)); memset(*p, '\0', sizeof(struct hash8)); } p = &((*p)->next[(u_int8_t)(*p_s_addr >>8)]); if(*p == NULL){ *p = malloc(sizeof(struct hash8)); memset(*p, '\0', sizeof(struct hash8)); } p = &((*p)->next[(u_int8_t)(*p_s_addr)]); if(*p == NULL){ *p = malloc(sizeof(struct hash8)); memset(*p, '\0', sizeof(struct hash8)); } p = &((*p)->next[(u_int8_t)(Template_ID >> 8)]); if(*p == NULL){ *p = malloc(sizeof(struct hash8)); memset(*p, '\0', sizeof(struct hash8)); } p = &((*p)->next[(u_int8_t)(Template_ID)]); if(*p == NULL){ *p = malloc(sizeof(struct template_base)); memset(*p, '\0', sizeof(struct template_base)); } return((struct template_base *)*p);}struct template *lookup_template( in_addr_t *p_s_addr, u_int16_t Template_ID){ struct template_base *template_base; template_base = _lookup_template(p_s_addr, Template_ID); if(template_base->template == NULL){ template_base->template = malloc(sizeof(struct template)); memset(template_base->template, '\0', sizeof(struct template)); } return(template_base->template);}int handle_Options_FlowSet( char *buf, struct NetFlow_Version_9_Packet_Header *ph, in_addr_t *p_s_addr){ struct NetFlow_Version_9_Options_Template_Header *oth; struct NetFlow_Version_9_FlowSet_Record *fr; struct template *record; int pos; int i; oth = (struct NetFlow_Version_9_Options_Template_Header *)buf; pos = sizeof(struct NetFlow_Version_9_Options_Template_Header); record = lookup_template(p_s_addr, ntohs(oth->Template_ID)); if(debug){ printf("\t\tOptions Template ID %hu has %hu scope and %hu option records\n", ntohs(oth->Template_ID), ntohs(oth->Options_Scope_Length) / 4, ntohs(oth->Options_Length) / 4 ); } for(i = 0; i < ((ntohs(oth->Options_Scope_Length) + (ntohs(oth->Options_Length))) / 4); i++){ fr = (struct NetFlow_Version_9_FlowSet_Record *)&buf[pos]; if(i > 0){ record->next = malloc(sizeof(struct template)); record = record->next; } if(i < (ntohs(oth->Options_Scope_Length) / 4)){ record->is_scope_field = 1; }else{ record->is_scope_field = 0; } record->record.Type = ntohs(fr->Type); record->record.Length = ntohs(fr->Length); record->next = NULL; pos += sizeof(struct NetFlow_Version_9_FlowSet_Record); } record = lookup_template(p_s_addr, ntohs(oth->Template_ID)); return(pos);}void set_ipv6_template ( in_addr_t *p_s_addr, u_int16_t Template_ID){ struct template_base *template_base; template_base = _lookup_template(p_s_addr, Template_ID); template_base->is_ipv6 = 1;}unsigned char is_ipv6_template ( in_addr_t *p_s_addr, u_int16_t Template_ID){ struct template_base *template_base; template_base = _lookup_template(p_s_addr, Template_ID); return template_base->is_ipv6;}int handle_Template_FlowSet( char *buf, struct NetFlow_Version_9_Packet_Header *ph, in_addr_t *p_s_addr){ struct NetFlow_Version_9_Template_Header *th; struct NetFlow_Version_9_FlowSet_Record *fr; struct template *record; int pos; int i; th = (struct NetFlow_Version_9_Template_Header *)buf; pos = sizeof(struct NetFlow_Version_9_Template_Header); record = lookup_template(p_s_addr, ntohs(th->Template_ID)); if(debug){ printf("\t\tTemplate ID %hu has %hu records\n", ntohs(th->Template_ID), ntohs(th->Field_Count)); } for(i = 0; i < ntohs(th->Field_Count); i++){ fr = (struct NetFlow_Version_9_FlowSet_Record *)&buf[pos]; if(i > 0){ record->next = malloc(sizeof(struct template)); record = record->next; } record->is_scope_field = 0; record->record.Type = ntohs(fr->Type); record->record.Length = ntohs(fr->Length); if(record->record.Type == IPV6_SRC_ADDR){ set_ipv6_template(p_s_addr, ntohs(th->Template_ID)); } record->next = NULL; pos += sizeof(struct NetFlow_Version_9_FlowSet_Record); } record = lookup_template(p_s_addr, ntohs(th->Template_ID)); return(pos);}u_int64_t insert_Header_for_NetFlow_Version_9( struct NetFlow_Version_9_Packet_Header *ph, in_addr_t *p_s_addr){ char buffer[1024]; if(store_exporter_address){ sprintf( buffer, SQL_INSERT_HEADER_WITH_EXPORTER_ADDRESS, (unsigned short)ntohs(ph->Version), (unsigned long)ntohl(ph->System_Uptime), (unsigned long)ntohl(ph->UNIX_Seconds), (unsigned long)ntohl(ph->Package_Sequence), (unsigned long)ntohl(ph->Source_ID), (unsigned long)ntohl(*p_s_addr) ); }else{ sprintf( buffer, SQL_INSERT_HEADER, (unsigned short)ntohs(ph->Version), (unsigned long)ntohl(ph->System_Uptime), (unsigned long)ntohl(ph->UNIX_Seconds), (unsigned long)ntohl(ph->Package_Sequence), (unsigned long)ntohl(ph->Source_ID) ); } if(debug){ printf("SQL: %s\n", buffer); } if(mysql_real_query(sql->handler, buffer, strlen(buffer))){ fprintf(stderr, "mysql_real_query() failed\n"); fprintf(stderr, "%s\n", mysql_error(sql->handler)); } return mysql_insert_id(sql->handler);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -