📄 netflow2mysql.c
字号:
unsigned long long insert_Header_for_NetFlow_Version_5( struct netflow_v5_header *ph, in_addr_t *p_s_addr){ char buffer[1024]; if(store_exporter_address){ sprintf( buffer, SQL_INSERT_HEADER_WITH_EXPORTER_ADDRESS, (unsigned short)ntohs(ph->version), (unsigned long)ntohl(ph->SysUptime), (unsigned long)ntohl(ph->unix_secs), (unsigned long)ntohl(ph->flow_sequence), (unsigned long) ((((unsigned char)(ph->engine_type)) << 8) + (unsigned char)(ph->engine_id)), (unsigned long)ntohl(*p_s_addr) ); }else{ sprintf( buffer, SQL_INSERT_HEADER, (unsigned short)ntohs(ph->version), (unsigned long)ntohl(ph->SysUptime), (unsigned long)ntohl(ph->unix_secs), (unsigned long)ntohl(ph->flow_sequence), (unsigned long) ((((unsigned char)(ph->engine_type)) << 8) + (unsigned char)(ph->engine_id)) ); } if(debug){ printf("SQL: %s\n", buffer); } if(mysql_real_query(sql->handler, buffer, strlen(buffer))){ fprintf(stderr, "mysql_real_query() failed\n"); fprintf(stderr, "%s\n", mysql_error(sql->handler)); } return mysql_insert_id(sql->handler);}unsigned long packet_sampling( unsigned long this_packet_count, unsigned long in_byte, unsigned long *out_byte){ unsigned long long prev_packet_count; unsigned long sampled_packet_count; prev_packet_count = packet_count; packet_count += this_packet_count;/* printf("//%llu/%lu/%lu/%llu//", packet_count, this_packet_count, packet_sampling_rate, (packet_count / packet_sampling_rate) - (prev_packet_count / packet_sampling_rate) ); */ if(packet_sampling_rate == 1){ sampled_packet_count = packet_count - prev_packet_count; *out_byte = in_byte; }else{ sampled_packet_count = (packet_count / packet_sampling_rate) - (prev_packet_count / packet_sampling_rate); *out_byte = (int)( (double)in_byte * (double)sampled_packet_count / (double)this_packet_count); } return(sampled_packet_count);}unsigned long insert_Record_for_NetFlow_Version_5( struct netflow_v5_record *r, unsigned long long *p_hid){ char buffer[1024]; unsigned long sampled_packet_count; unsigned long sampled_byte_count; sampled_packet_count = packet_sampling( ntohl(r->dPkts), ntohl(r->dOctets), &sampled_byte_count ); if(sampled_packet_count == 0){ return(0); } sprintf( buffer, SQL_INSERT_IPV4_RECORD, (unsigned long long) *p_hid, (unsigned char) 4, (unsigned long) ntohl(r->Last), (unsigned long) ntohl(r->First), (unsigned long) sampled_byte_count, (unsigned long) sampled_packet_count, (unsigned short) ntohs(r->input), (unsigned short) ntohs(r->output), (unsigned long) ntohl(r->srcaddr), (unsigned long) ntohl(r->dstaddr), (unsigned char) r->prot, (unsigned char) r->tos, (unsigned short) ntohs(r->srcport), (unsigned short) ntohs(r->dstport), (unsigned long) ntohl(r->nexthop), (unsigned short) ntohs(r->dst_as), (unsigned short) ntohs(r->src_as), (unsigned char) r->dst_mask, (unsigned char) r->src_mask, (unsigned char) r->tcp_flags ); if(debug){ printf("SQL: %s\n", buffer); } if(mysql_real_query(sql->handler, buffer, strlen(buffer))){ fprintf(stderr, "mysql_real_query() failed\n"); fprintf(stderr, "%s\n", mysql_error(sql->handler)); } return(1);}int _insert_Record_for_NetFlow_Version_9( unsigned char *buf, struct template *record, int is_ipv6_record){ int i = 0; char *bind_map; if(sql->ipv4_bind_map[record->record.Type] == -1){ return(0); } if(record->record.Length == 0 || record->record.Length > 16){ return(0); } if(is_ipv6_record){ bind_map = sql->ipv6_bind_map; }else{ bind_map = sql->ipv4_bind_map; } if(record->record.Length > 8){ // if the containing data does not fit to // unsigned char, unsigned short, // unsigned long or unsigned long long. for(i = 0; i < record->record.Length; i++){ // the data will be stored in network byte order sql->record_buffer[(unsigned char)bind_map[record->record.Type]][i] = buf[i]; } return(1); } // when storeing data as unsigned char, unsigned short, unsigned long or // unsigned longlong, we store the data as a N bit integer and we have to // aware of the byte order. for(i = 0; i < record->record.Length; i++){ if(is_big_endian()){ sql->record_buffer[(unsigned char)bind_map[record->record.Type]][i] = buf[i]; }else{ sql->record_buffer[(unsigned char)bind_map[record->record.Type]][i] = buf[record->record.Length - i - 1]; } } return(1);}int insert_Record_for_NetFlow_Version_9( char *buf, int *pos, struct template *record, int is_ipv6_record, unsigned long long *p_hid){ int i = 0; unsigned long sampled_packet_count; unsigned long sampled_byte_count; char buffer[1024]; char buffer_for_ipv6_address[3][16*2+1]; memset(sql->record_buffer, '\0', sizeof(sql->record_buffer[19][16])); while(record != NULL){ i += _insert_Record_for_NetFlow_Version_9(&buf[*pos], record, is_ipv6_record); *pos += record->record.Length; record = record->next; } if(i == 0){ return(0); } sampled_packet_count = packet_sampling( *(unsigned long *)sql->record_buffer[4], *(unsigned long *)sql->record_buffer[3], &sampled_byte_count ); if(sampled_packet_count == 0){ return(0); } if(is_ipv6_record){ mysql_real_escape_string( sql->handler, buffer_for_ipv6_address[0], sql->record_buffer[7], 16 ); mysql_real_escape_string( sql->handler, buffer_for_ipv6_address[1], sql->record_buffer[8], 16 ); mysql_real_escape_string( sql->handler, buffer_for_ipv6_address[2], sql->record_buffer[13], 16 ); sprintf( buffer, SQL_INSERT_IPV6_RECORD, (unsigned long long) *p_hid, (unsigned char) 6, *(unsigned long *) sql->record_buffer[1], *(unsigned long *) sql->record_buffer[2], (unsigned long) sampled_byte_count, (unsigned long) sampled_packet_count, *(unsigned short *) sql->record_buffer[5], *(unsigned short *) sql->record_buffer[6], (char *) buffer_for_ipv6_address[0], (char *) buffer_for_ipv6_address[1], *(unsigned char *) sql->record_buffer[9], *(unsigned char *) sql->record_buffer[10], *(unsigned short *) sql->record_buffer[11], *(unsigned short *) sql->record_buffer[12], (char *) buffer_for_ipv6_address[2], *(unsigned short *) sql->record_buffer[14], *(unsigned short *) sql->record_buffer[15], *(unsigned char *) sql->record_buffer[16], *(unsigned char *) sql->record_buffer[17], *(unsigned char *) sql->record_buffer[18] ); }else{ sprintf( buffer, SQL_INSERT_IPV4_RECORD, (unsigned long long) *p_hid, (unsigned char) 4, *(unsigned long *) sql->record_buffer[1], *(unsigned long *) sql->record_buffer[2], (unsigned long) sampled_byte_count, (unsigned long) sampled_packet_count, *(unsigned short *) sql->record_buffer[5], *(unsigned short *) sql->record_buffer[6], *(unsigned long *) sql->record_buffer[7], *(unsigned long *) sql->record_buffer[8], *(unsigned char *) sql->record_buffer[9], *(unsigned char *) sql->record_buffer[10], *(unsigned short *) sql->record_buffer[11], *(unsigned short *) sql->record_buffer[12], *(unsigned long *) sql->record_buffer[13], *(unsigned short *) sql->record_buffer[14], *(unsigned short *) sql->record_buffer[15], *(unsigned char *) sql->record_buffer[16], *(unsigned char *) sql->record_buffer[17], *(unsigned char *) sql->record_buffer[18] ); } if(debug){ printf("SQL: %s\n", buffer); } if(mysql_real_query(sql->handler, buffer, strlen(buffer))){ fprintf(stderr, "mysql_real_query() failed\n"); fprintf(stderr, "%s\n", mysql_error(sql->handler)); } return(1);}int handle_Data_FlowSet( unsigned char *buf, u_int16_t Template_ID, struct NetFlow_Version_9_Packet_Header *ph, int *p_insert_Header, unsigned long long *p_hid, in_addr_t *p_s_addr){ struct template *record; int pos; pos = 0; record = lookup_template(p_s_addr, Template_ID); if(record->record.Type == 0){ fprintf(stderr, "WARN: \t\tUnknown Template: %hu\n", Template_ID); return(0); } if(*p_insert_Header == 0){ *p_hid = insert_Header_for_NetFlow_Version_9(ph, p_s_addr); *p_insert_Header = 1; } insert_Record_for_NetFlow_Version_9(buf, &pos, record, is_ipv6_template(p_s_addr, Template_ID), p_hid); return(pos);}int fill_padding( int pos){ if(pos % 4 != 0){ return(4 - (pos % 4)); } return(0);}void handle_NetFlow_v5_packet( char *buf, int length, in_addr_t *p_s_addr){ int i; int num_of_records; struct netflow_v5_header *h; struct netflow_v5_record *r; unsigned long long hid; if((length - sizeof(struct netflow_v5_header)) % sizeof(struct netflow_v5_record) != 0){ perror("WARN: 1 Malformed NetFlow v5 packet.\n"); return; } num_of_records = (length - sizeof(struct netflow_v5_header)) / sizeof(struct netflow_v5_record); h = (struct netflow_v5_header *)buf; if(ntohs(h->count) != num_of_records){ perror("WARN: 2 Malformed NetFlow v5 packet.\n"); return; } hid = insert_Header_for_NetFlow_Version_5(h, p_s_addr); for(i = 0; i < num_of_records; i++){ r = (struct netflow_v5_record *)&buf[sizeof(struct netflow_v5_header) + sizeof(struct netflow_v5_record) * i]; insert_Record_for_NetFlow_Version_5(r, &hid); }}void handle_NetFlow_v9_packet( char *buf, int length, in_addr_t *p_s_addr){ int pos; int pos2; int rv; int insert_Header; unsigned long long hid; struct NetFlow_Version_9_Packet_Header *ph; struct NetFlow_Version_9_FlowSet_Header *fh; pos = 0; insert_Header = 0; hid = 0; ph = (struct NetFlow_Version_9_Packet_Header *)buf; if(debug){ printf("\tVersion:\t%hu\n", ntohs(ph->Version)); printf("\tPacket_Count:\t%hu\n", ntohs(ph->Count)); printf("\tSystem_Uptime:\t%lu\n", (unsigned long)ntohl(ph->System_Uptime)); printf("\tUNIX_Seconds:\t%lu\n", (unsigned long)ntohl(ph->UNIX_Seconds)); printf("\tPackage_Sequence:\t%lu\n", (unsigned long)ntohl(ph->Package_Sequence)); printf("\tSource_ID:\t%lu\n", (unsigned long)ntohl(ph->Source_ID)); printf("\tExporter_Address:\t%s(%lu)\n\n", inet_ntoa(*(struct in_addr *)(p_s_addr)), (unsigned long)ntohl(*p_s_addr) ); } pos += sizeof(struct NetFlow_Version_9_Packet_Header); while(pos < length){ fh = (struct NetFlow_Version_9_FlowSet_Header *)&buf[pos]; if(debug){ printf("\tFlowSet ID:%hu ", ntohs(fh->FlowSet_ID)); } pos += sizeof(struct NetFlow_Version_9_FlowSet_Header); pos2 = sizeof(struct NetFlow_Version_9_FlowSet_Header); while(pos2 + fill_padding(pos2) < ntohs(fh->Length)){ if((ntohs(fh->FlowSet_ID) & 256) == 256){ if(debug){printf("\tData FlowSet\n");} rv = handle_Data_FlowSet( &buf[pos], ntohs(fh->FlowSet_ID), ph, &insert_Header, &hid, p_s_addr ); }else if(ntohs(fh->FlowSet_ID) == 0){ if(debug){printf("\tTemplate FlowSet\n");} //Template FlowSet rv = (handle_Template_FlowSet( &buf[pos], ph, p_s_addr )); }else if(ntohs(fh->FlowSet_ID) == 1){ if(debug){printf("\tOptions FlowSet\n");} //Options FlowSet rv = (handle_Options_FlowSet( &buf[pos], ph, p_s_addr )); } if(rv == 0){ rv = ntohs(fh->Length) - sizeof(struct NetFlow_Version_9_FlowSet_Header); if(rv == 0){ return; } } pos += rv; pos2 += rv; if(debug){printf("\n");} } pos += fill_padding(pos); }}void handle_NetFlow_packet( char *buf, int length, struct in_addr *p_in_addr){ u_int16_t *v; v = (u_int16_t *)buf; switch (ntohs(*v)) { case 9: if(debug){ printf("NetFlow v%hu Packet from %s\n", ntohs(*v), inet_ntoa(*p_in_addr)); } handle_NetFlow_v9_packet(buf, length, &(p_in_addr->s_addr)); break; case 5: if(debug){ printf("NetFlow v%hu Packet from %s\n", ntohs(*v), inet_ntoa(*p_in_addr)); } handle_NetFlow_v5_packet(buf, length, &(p_in_addr->s_addr)); break; default: fprintf(stderr, "WARN: Unsuported NetFlow Packet from %s.\n", inet_ntoa(*p_in_addr)); break; }}intmain( int argc, char *argv[]){ int s; int i; int n; int mtu; char *buf; struct sockaddr_in from; int from_len = 0; memset(&template_lookup, '\0', sizeof(struct hash8)); s = create_socket(argc, argv, &mtu, &buf); connect_mysql(argc, argv); setvbuf(stdout, NULL, _IONBF, 0); while(1){ n = recvfrom(s, buf, mtu, 0, (struct sockaddr *)&from, &from_len); handle_NetFlow_packet(buf, n, &(from.sin_addr)); } for(i = 1; i < argc; i++){ memset(argv[i], '\0', strlen(argv[i])); argv[i] = "\0"; } free(buf); exit(0);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -