📄 flow-receive.c
字号:
fterr_err(1, "setsockopt(IP_RECVDSTADDR)");#else#ifdef IP_PKTINFO one = 1; /* return the destination IP address */ if (setsockopt(ftnet.fd, IPPROTO_IP, IP_PKTINFO, (char *)&one, sizeof(one)) < 0) fterr_err(1, "setsockopt(IP_PKTINFO)");#endif /* else */#endif /* IP_RECVDSTADDR */ /* if out_fname is not set, then use stdout */ if (out_fname) { if ((out_fd = open(out_fname, O_WRONLY|O_CREAT|O_TRUNC, 0644)) == -1) fterr_err(1, "open(%s)", out_fname); if (fstat(out_fd, &stat_buf) == -1) fterr_err(1, "fstat(%s)", out_fname); /* is this a plain file? */ if (!stat_buf.st_rdev) out_fd_plain = 1; } else out_fd = 1; /* output to out_fd */ if (ftio_init(&ftio, out_fd, FT_IO_FLAG_NO_SWAP | FT_IO_FLAG_WRITE | ((ftset.z_level) ? FT_IO_FLAG_ZINIT : 0) ) < 0) fterr_errx(1, "ftio_init(): failed"); time_start = (u_int32)time((time_t)0L); ftio_set_comment(&ftio, ftset.comments); ftio_set_cap_hostname(&ftio, ftset.hnbuf); ftio_set_byte_order(&ftio, ftset.byte_order); ftio_set_z_level(&ftio, ftset.z_level); if (out_fd_plain) ftio_set_cap_time(&ftio, time_start, 0); else ftio_set_cap_time_start(&ftio, time_start); ftio_set_debug(&ftio, debug); ftio_set_streaming(&ftio, 1);/* ftio_map_load(&ftio, FT_FILE_MAP, ftnet.rem_ip); */ /* header must be full size on initial write */ if (out_fd_plain) { ftio_set_flows_count(&ftio, nflows); ftio_set_corrupt(&ftio, flows_corrupt); ftio_set_lost(&ftio, flows_lost); ftio_set_reset(&ftio, flows_reset); ftio_set_flows_count(&ftio, nflows); } /* if version set on command line, write out header here */ if (ftv.set) { if (ftio_set_ver(&ftio, &ftv) < 0) fterr_errx(1, "ftio_set_ver(): failed"); /* need offsets for filter later */ fts3rec_compute_offsets(&fo, &ftv); /* header first */ if (ftio_write_header(&ftio) < 0) fterr_errx(1, "ftio_write_header()"); } /* init hash table for demuxing exporters */ if (!(ftch = ftchash_new(256, sizeof (struct ftchash_rec_exp), 12, 1))) fterr_errx(1, "ftchash_new(): failed"); /* init msg block */ ftnet.iov[0].iov_len = sizeof ftpdu.buf; ftnet.iov[0].iov_base = (char*)&ftpdu.buf; ftnet.msg.msg_iov = (struct iovec*)&ftnet.iov; ftnet.msg.msg_iovlen = 1; ftnet.msg.msg_name = &ftnet.rem_addr; ftnet.msg.msg_namelen = sizeof ftnet.rem_addr; ftnet.msg.msg_control = &ftnet.msgip; ftnet.msg.msg_controllen = sizeof ftnet.msgip; /* default timeout waiting for an active fd */ tv.tv_sec = SELECT_TIMEOUT; while (1) { FD_ZERO (&rfd); FD_SET (ftnet.fd, &rfd); if (select (ftnet.fd+1, &rfd, (fd_set *)0, (fd_set *)0, &tv) < 0) { if (errno == EINTR) { FD_ZERO (&rfd); } else { fterr_err(1, "select()"); } } now = time((time_t)0L); /* reset */ bzero (&tv, sizeof tv); tv.tv_sec = SELECT_TIMEOUT; if (stat_interval) { tm = localtime (&now); /* * note there is an obscure race condition here if this * code is not reached at least every stat_interval*60 seconds * where up to 1 hour of STAT lines would not show up. * This is highly unlikely and not handled. */ if ((tm->tm_min == stat_next) || (stat_next == -1)) { ftchash_first(ftch); while ((ftch_recexpp = ftchash_foreach(ftch))) { fmt_ipv4(fmt_src_ip, ftch_recexpp->src_ip, FMT_JUST_LEFT); fmt_ipv4(fmt_dst_ip, ftch_recexpp->dst_ip, FMT_JUST_LEFT); fterr_info( "STAT: now=%lu startup=%lu src_ip=%s dst_ip=%s d_ver=%d pkts=%lu flows=%lu lost=%lu reset=%lu", (unsigned long)now, (unsigned long)time_startup, fmt_src_ip, fmt_dst_ip, ftch_recexpp->d_version, (u_long)ftch_recexpp->packets, (u_long)ftch_recexpp->flows, (u_long)ftch_recexpp->lost, (u_long)ftch_recexpp->reset); } stat_next = (tm->tm_min + (stat_interval - tm->tm_min % stat_interval)) % 60; } } /* stat_inverval */ if (done) { fterr_info("Cleaning up"); break; } if (FD_ISSET(ftnet.fd, &rfd)) {restart_recvmsg: if ((ftpdu.bused = recvmsg(ftnet.fd, (struct msghdr*)&ftnet.msg, 0)) < 0) { if (errno == EAGAIN) goto restart_recvmsg; fterr_err(1, "recvmsg()"); }#ifdef IP_RECVDSTADDR /* got destination IP back? */ if ((ftnet.msgip.hdr.cmsg_level == IPPROTO_IP) && (ftnet.msgip.hdr.cmsg_type == IP_RECVDSTADDR)) { ftnet.loc_addr.sin_addr.s_addr = ftnet.msgip.ip.s_addr; } else { ftnet.loc_addr.sin_addr.s_addr = 0; }#else#ifdef IP_PKTINFO if ((ftnet.msgip.hdr.cmsg_level == IPPROTO_IP) && (ftnet.msgip.hdr.cmsg_type == IP_PKTINFO)) { ftnet.loc_addr.sin_addr.s_addr = ftnet.msgip.pktinfo.ipi_addr.s_addr; } else { ftnet.loc_addr.sin_addr.s_addr = 0; }#else ftnet.loc_addr.sin_addr.s_addr = 0;#endif#endif /* IP_RECVDSTADDR */ /* fill in hash key */ ftch_recexp.src_ip = htonl(ftnet.rem_addr.sin_addr.s_addr); ftch_recexp.dst_ip = htonl(ftnet.loc_addr.sin_addr.s_addr); ftch_recexp.dst_port = ftnet.dst_port; /* verify integrity, get version */ if (ftpdu_verify(&ftpdu) < 0) { fmt_ipv4(fmt_src_ip, ftch_recexp.src_ip, FMT_JUST_LEFT); fterr_warnx("ftpdu_verify(): src_ip=%s failed.", fmt_src_ip); flows_corrupt ++; goto skip1; } /* rest of hash key */ ftch_recexp.d_version = ftpdu.ftv.d_version; /* if exporter src IP has been configured then make sure it matches */ if (ftnet.rem_ip && (ftnet.rem_ip != ftch_recexp.src_ip)) { fmt_ipv4(fmt_src_ip, ftch_recexp.src_ip, FMT_JUST_LEFT); fterr_warnx("Unexpected PDU: src_ip=%s not configured", fmt_src_ip); flows_corrupt ++; goto skip1; } /* first flow or no configured destination version? */ if (!ftv.set) { /* set the version information in the io stream */ if (ftio_set_ver(&ftio, &ftpdu.ftv) < 0) fterr_errx(1, "ftio_set_ver(): failed"); /* copy to compare next time */ bcopy(&ftpdu.ftv, &ftv, sizeof ftv); /* flag struct as configured */ ftv.set = 1; /* need offsets for filter later */ fts3rec_compute_offsets(&fo, &ftv); /* header first */ if (ftio_write_header(&ftio) < 0) fterr_errx(1, "ftio_write_header(): failed"); } else { /* translation to/from v8 not possible */ if (((ftv.d_version == 8) && (ftpdu.ftv.d_version != 8)) || ((ftv.d_version != 8) && (ftpdu.ftv.d_version == 8))) { fmt_ipv4(fmt_src_ip, ftch_recexp.dst_ip, FMT_JUST_LEFT); fterr_warnx("Unexpected PDU: src_ip=%s no v8 translation", fmt_src_ip); ++flows_corrupt; goto skip1; } /* translation among v8 aggregation methods not possible */ if ((ftv.d_version == 8) && ((ftv.agg_method != ftpdu.ftv.agg_method) || (ftv.agg_version != ftpdu.ftv.agg_version))) { fmt_ipv4(fmt_src_ip, ftch_recexp.dst_ip, FMT_JUST_LEFT); fterr_warnx( "Unexpected PDU: src_ip=%s multi v8 oagg=%d agg=%d over=%d ver=%d", fmt_src_ip, (int)ftv.agg_method, (int)ftpdu.ftv.agg_method, (int)ftv.agg_version, (int)ftpdu.ftv.agg_version); ++flows_corrupt; goto skip1; } } /* version processing */ /* compute 8 bit hash */ hash = (ftch_recexp.src_ip & 0xFF); hash ^= (ftch_recexp.src_ip>>24); hash ^= (ftch_recexp.dst_ip & 0xFF); hash ^= (ftch_recexp.dst_ip>>24); hash ^= (ftch_recexp.d_version & 0xFF); if (!(ftch_recexpp = ftchash_update(ftch, &ftch_recexp, hash))) fterr_errx(1, "ftch_update(): failed"); /* if the packet count is 0, then this is a new entry */ if (ftch_recexpp->packets == 0) { fmt_ipv4(fmt_src_ip, ftch_recexp.src_ip, FMT_JUST_LEFT); fmt_ipv4(fmt_dst_ip, ftch_recexp.dst_ip, FMT_JUST_LEFT); fterr_info("New exporter: time=%lu src_ip=%s dst_ip=%s d_version=%d", (u_long)now, fmt_src_ip, fmt_dst_ip, (int)ftpdu.ftv.d_version); /* set translation function */ if (ftch_recexp.d_version != ftv.d_version) ftch_recexpp->xlate = ftrec_xlate_func(&ftpdu.ftv, &ftv); } /* verify sequence number */ if (ftpdu_check_seq(&ftpdu, &(ftch_recexpp->ftseq)) < 0) { fmt_ipv4(fmt_src_ip, ftch_recexp.src_ip, FMT_JUST_LEFT); fmt_ipv4(fmt_dst_ip, ftch_recexp.dst_ip, FMT_JUST_LEFT); fmt_uint16(fmt_dst_port, ftch_recexp.dst_port, FMT_JUST_LEFT); fterr_warnx( "ftpdu_seq_check(): src_ip=%s dst_ip=%s d_version=%d expecting=%lu received=%lu lost=%lu", fmt_src_ip, fmt_dst_ip, (int)ftpdu.ftv.d_version, (u_long)ftch_recexpp->ftseq.seq_exp, (u_long)ftch_recexpp->ftseq.seq_rcv, (u_long)ftch_recexpp->ftseq.seq_lost); /* only count these lost if "lost" is a reasonable number */ if (ftch_recexpp->ftseq.seq_lost < FT_SEQ_RESET) { flows_lost += ftch_recexpp->ftseq.seq_lost; ftch_recexpp->lost += ftch_recexpp->ftseq.seq_lost; } else { flows_reset ++; ftch_recexpp->reset ++; } } /* decode */ ftpdu.ftd.byte_order = ftset.byte_order; ftpdu.ftd.as_sub = ftset.as_sub; ftpdu.ftd.exporter_ip = ftch_recexp.src_ip; n = fts3rec_pdu_decode(&ftpdu); /* update the exporter stats */ ftch_recexpp->packets ++; ftch_recexpp->flows += n; /* write decoded flows */ for (i = 0, offset = 0; i < n; ++i, offset += ftpdu.ftd.rec_size) { /* translate version? */ if (ftch_recexpp->xlate) { ftch_recexpp->xlate(ftpdu.ftd.buf+offset, &xl_rec); out_rec = (char*)&xl_rec; /* tagging? */ if (tag_active) fttag_def_eval(ftd, (struct fts3rec_v1005*)out_rec); } else { out_rec = (char*)ftpdu.ftd.buf+offset; } /* filter? */ if (ftfd) if (ftfil_def_eval(ftfd, out_rec, &fo) == FT_FIL_MODE_DENY) { ++filtered_flows; continue; } ++nflows; /* simple data privacy */ if (privacy_mask != 0xFFFFFFFF) ftrec_mask_ip(out_rec, &ftv, &ftipmask); if (ftio_write(&ftio, out_rec) < 0) fterr_errx(1, "ftio_write(): failed"); } /* for */skip1: } /* if FD_ISSET */ } /* while 1 */ fterr_info("flows stored/dropped by filter %lu/%lu", (u_long)nflows, filtered_flows); /* rewrite header with updated info */ if (out_fd_plain) { time_end = (u_int32)time((time_t)0L); ftio_set_cap_time(&ftio, time_start, time_end); ftio_set_flows_count(&ftio, nflows); ftio_set_streaming(&ftio, 0); ftio_set_corrupt(&ftio, flows_corrupt); ftio_set_lost(&ftio, flows_lost); ftio_set_reset(&ftio, flows_reset); if (ftio_write_header(&ftio) < 0) fterr_errx(1, "ftio_write_header(): failed"); } /* close stream */ if (ftio_close(&ftio) < 0) fterr_errx(1, "ftio_close(): failed"); /* close input */ close (ftnet.fd); return 0;} /* main */void sig_quit(int sig){ done = 1;} /* sig_quit */void usage(void) { fprintf(stderr, "Usage: flow-receive [-h] [-A AS0_substitution] [-b big|little] [-C comment]\n"); fprintf(stderr, " [-d debug_level] [-m privacy_mask] [-o output_file] [-S stat_interval]\n"); fprintf(stderr, " [-t tag_fname] [-T tag_active] [-V pdu_version] [-z z_level]\n"); fprintf(stderr, " localip/remoteip/port\n"); fprintf(stderr, "\n%s version %s: built by %s\n", PACKAGE, VERSION, FT_PROG_BUILD);} /* usage */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -