📄 flow-capture.c
字号:
fmt_ipv4(fmt_src_ip, ftch_recexp.src_ip, FMT_JUST_LEFT); fmt_ipv4(fmt_dst_ip, ftch_recexp.dst_ip, FMT_JUST_LEFT); fterr_info("New exporter: time=%lu src_ip=%s dst_ip=%s d_version=%d", (u_long)tt_now, fmt_src_ip, fmt_dst_ip, (int)ftpdu.ftv.d_version); /* set translation function */ if (ftch_recexp.d_version != ftv.d_version) ftch_recexpp->xlate = ftrec_xlate_func(&ftpdu.ftv, &ftv); } /* verify sequence number */ if (ftpdu_check_seq(&ftpdu, &(ftch_recexpp->ftseq)) < 0) { fmt_ipv4(fmt_src_ip, ftch_recexp.src_ip, FMT_JUST_LEFT); fmt_ipv4(fmt_dst_ip, ftch_recexp.dst_ip, FMT_JUST_LEFT); fmt_uint16(fmt_dst_port, ftch_recexp.dst_port, FMT_JUST_LEFT); fterr_warnx( "ftpdu_seq_check(): src_ip=%s dst_ip=%s d_version=%d expecting=%lu received=%lu lost=%lu", fmt_src_ip, fmt_dst_ip, (int)ftpdu.ftv.d_version, (u_long)ftch_recexpp->ftseq.seq_exp, (u_long)ftch_recexpp->ftseq.seq_rcv, (u_long)ftch_recexpp->ftseq.seq_lost); /* only count these lost if "lost" is a reasonable number */ if (ftch_recexpp->ftseq.seq_lost < FT_SEQ_RESET) { cap_file.hdr_flows_lost += ftch_recexpp->ftseq.seq_lost; ftch_recexpp->lost += ftch_recexpp->ftseq.seq_lost; } else { cap_file.hdr_flows_reset ++; ftch_recexpp->reset ++; } } /* decode the pdu */ ftpdu.ftd.byte_order = ftset.byte_order; ftpdu.ftd.as_sub = ftset.as_sub; ftpdu.ftd.exporter_ip = ftch_recexp.src_ip; n = fts3rec_pdu_decode(&ftpdu); /* update the exporter stats */ ftch_recexpp->packets ++; ftch_recexpp->flows += n; } /* PDU on receive buffer */skip_pdu_decode: /* no current file and pdu version has been set -> create file */ if ((cap_file.fd == -1) && (ftv.d_version)) { /* calculate the current rotation and next rotate time */ if (calc_rotate(rot.n, &rot.next, &rot.cur) == -1) fterr_errx(1, "calc_rotate(): failed"); /* remember when file was created */ cap_file.time = (u_int32)tt_now; /* remember the version encoded in the filename */ bcopy(&ftv, &cap_file.ftv, sizeof cap_file.ftv); /* construct the capture file name */ ftfile_pathname(cap_file.name, MAXPATHLEN, nest, cap_file.ftv, 0, cap_file.time); /* create directory path for file */ if (ftfile_mkpath(cap_file.time, nest) < 0) fterr_err(1, "ftfile_mkpath(%s)", cap_file.name); /* create/open the capture file */ if ((cap_file.fd = open(cap_file.name, O_WRONLY|O_CREAT|O_TRUNC, 0644)) == -1) fterr_err(1, "open(%s)", cap_file.name); /* initialize the IO stream */ if (ftio_init(&ftio, cap_file.fd, FT_IO_FLAG_NO_SWAP | FT_IO_FLAG_WRITE | ((ftset.z_level) ? FT_IO_FLAG_ZINIT : 0) ) < 0) fterr_errx(1, "ftio_init(): failed"); /* set the version information in the io stream */ if (ftio_set_ver(&ftio, &ftv) < 0) fterr_errx(1, "ftio_set_ver(): failed"); /* need offsets for filter later */ fts3rec_compute_offsets(&fo, &ftv); ftio_set_comment(&ftio, ftset.comments); ftio_set_cap_hostname(&ftio, ftset.hnbuf); ftio_set_byte_order(&ftio, ftset.byte_order); ftio_set_z_level(&ftio, ftset.z_level); ftio_set_cap_time(&ftio, cap_file.time, 0); ftio_set_debug(&ftio, debug); ftio_set_corrupt(&ftio, cap_file.hdr_flows_corrupt); ftio_set_lost(&ftio, cap_file.hdr_flows_lost); ftio_set_reset(&ftio, cap_file.hdr_flows_reset); ftio_set_flows_count(&ftio, cap_file.hdr_nflows);/* ftio_map_load(&ftio, FT_MAP_FILE, ftnet.rem_ip); */ /* header first */ if ((n = ftio_write_header(&ftio)) < 0) fterr_errx(1, "ftio_write_header(): failed"); else cap_file.nbytes = n; } /* create capture file and init new io stream */ /* load filters and tags? */ if (reload_flag && ftv.set) { /* load tags */ if (tag_active) { /* not first time through, then free previous tags */ if (ftd) { fttag_free(&fttag); fterr_info("Reloading tags."); } if (fttag_load(&fttag, tag_fname) < 0) fterr_errx(1, "fttag_load(): failed"); if (!(ftd = fttag_def_find(&fttag, tag_active))) fterr_errx(1, "fttag_load(): failed"); } /* tag_active */ /* load filters */ if (filter_active) { /* not first time through, then free previous filters */ if (ftfd) { ftfil_free(&ftfil); fterr_info("Reloading filters."); } if (ftfil_load(&ftfil, filter_fname)) fterr_errx(1, "ftfil_load(%s): failed", filter_fname); if (!(ftfd = ftfil_def_find(&ftfil, filter_active))) fterr_errx(1, "ftfil_def_find(%s): failed", filter_active); if (ftfil_def_test_xfields(ftfd, ftrec_xfield(&ftv))) fterr_errx(1, "Filter references a field not in flow."); } /* filter_active */ reload_flag = 0; } /* reload_flag */ /* if the decode buffer has entries write them out */ for (i = 0, offset = 0; i < ftpdu.ftd.count; ++i, offset += ftpdu.ftd.rec_size) { /* translate version? */ if (ftch_recexpp->xlate) { ftch_recexpp->xlate(ftpdu.ftd.buf+offset, &xl_rec); out_rec = (char*)&xl_rec; /* tagging? */ if (tag_active) fttag_def_eval(ftd, (struct fts3rec_v1005*)out_rec); } else { out_rec = (char*)ftpdu.ftd.buf+offset; } /* filter? */ if (ftfd) if (ftfil_def_eval(ftfd, out_rec, &fo) == FT_FIL_MODE_DENY) { ++ftch_recexpp->filtered_flows; continue; } /* update # of flows stored in capture file */ cap_file.hdr_nflows ++; /* simple data privacy */ if (privacy_mask != 0xFFFFFFFF) ftrec_mask_ip(out_rec, &ftv, &ftipmask); if ((n = ftio_write(&ftio, out_rec)) < 0) fterr_errx(1, "ftio_write(): failed"); /* write to clients */ FT_LIST_FOREACH(client_rec, &client.list, chain) { if ((n = ftio_write(&client_rec->ftio, out_rec)) < 0) { fterr_info("Killed client: ip=%s, dtime=%lu", inet_ntoa(client_rec->addr.sin_addr), (unsigned long)tt_now - client_rec->conn_time); ftio_close(&client_rec->ftio); client_rec2 = client_rec; client_rec = client_rec->chain.le_next; FT_LIST_REMOVE(client_rec2, chain); free(client_rec2); --client.active; if (!client_rec) break; } /* ftio_write */ } /* foreach client */ /* update # of bytes stored in capture file */ cap_file.nbytes += n; } /* foreach entry in decode buffer */ /* * time for a new file ? */ if ((now > rot.next) || sig_quit_flag || sig_hup_flag) { if (sig_hup_flag) fterr_info("SIGHUP"); sig_hup_flag = 0; /* re-arm */ if (cap_file.fd != -1) { ftio_set_cap_time(&ftio, cap_file.time, (u_int32)tt_now); ftio_set_corrupt(&ftio, cap_file.hdr_flows_corrupt); ftio_set_lost(&ftio, cap_file.hdr_flows_lost); ftio_set_reset(&ftio, cap_file.hdr_flows_reset); ftio_set_flows_count(&ftio, cap_file.hdr_nflows); /* re-write header first */ if (ftio_write_header(&ftio) < 0) fterr_errx(1, "ftio_write_header(): failed"); if ((n = ftio_close(&ftio)) < 0) fterr_errx(1, "ftio_close(): failed"); cap_file.nbytes += n; /* construct final version of capture filename */ ftfile_pathname(cap_file.nname, MAXPATHLEN, nest, cap_file.ftv, 1, cap_file.time); /* rename working to final */ if (rename(cap_file.name, cap_file.nname) == -1) fterr_err(1, "rename(%s,%s)", cap_file.name, cap_file.nname); /* add it to the ager */ if (fte.expiring) if (ftfile_add_tail(&fte, cap_file.nname, cap_file.nbytes, cap_file.time)) fterr_errx(1, "ftfile_add_tail(%s): failed", cap_file.name); /* debugging gets a dump of the ager */ if (debug) ftfile_dump(&fte); /* Do the post rotate exec */ if (post_rotate_exec[0]) { if ((n = fork()) == -1) { fterr_err(1, "fork()"); } else if (!n) { /* child */ n = execl(post_rotate_exec, post_rotate_exec, cap_file.nname, NULL); if (n == -1) fterr_err(1, "exec(%s)", post_rotate_exec); _exit(0); } /* child */ } /* post rotate exec */ /* reset */ bzero(&cap_file, sizeof cap_file); /* invalidate file descriptor */ cap_file.fd = -1; /* had enough */ if (sig_quit_flag) goto main_exit; } /* file open */ } /* time for new file */ /* also need to check sig_quit if no file has been processed yet */ if (sig_quit_flag) goto main_exit; /* * If client attachments are enabled, and the flow version has been * determined setup a listener, but only once. */ if (ftv.set && client.max && !client.enabled) { /* socket for listen */ if ((client.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) fterr_err(1, "socket()"); one = 1; if (setsockopt(client.fd, SOL_SOCKET, SO_REUSEADDR, (char*)&one, sizeof (one))) fterr_err(1, "setsockopt(SO_REUSEADDR)"); if (bind(client.fd, (struct sockaddr*)&client.addr, sizeof (client.addr)) < 0) fterr_err(1, "bind(%s)", inet_ntoa(client.addr.sin_addr)); /* listen for new TCP connections */ if (listen(client.fd, 5) < 0) fterr_err(1, "listen()"); /* non blocking */ if (fcntl(client.fd, F_SETFL, O_NONBLOCK) < 0) fterr_err(1, "fcntl()"); client.enabled = 1; } /* clients enabled */ if (!(cap_file.hdr_nflows % 1001)) { if (fte.expiring) if (ftfile_expire(&fte, enable_unlink, cap_file.nbytes)) fterr_errx(1, "ftfile_expire(): failed"); } /* ager run? */ } /* while 1 */main_exit: if (pidfile) unlink_pidfile(pid, pidfile, ftnet.dst_port); if (sig_quit_flag) fterr_info("SIGQUIT"); /* free storage allocated to file entries */ if (fte.expiring) ftfile_free(&fte); return 0;} /* main */void sig_pipe(int signo){ sig_pipe_flag = 1;} void sig_hup(int signo){ sig_hup_flag = 1; reload_flag = 1;} void sig_quit(int signo){ sig_quit_flag = 1;}void sig_chld(int signo){ sig_chld_flag = 1;}/* doubletime - like time(2), but returns a double (with fractional seconds) * This was inspired by the Time::HiRes perl module. E.g.: * $ perl -MTime::HiRes -le 'print scalar Time::HiRes::time' * See "perl/CPAN/authors/id/D/DE/DEWEG/Time-HiRes-01.20.tar.gz". * - Dave Plonka <plonka@doit.wisc.edu> */double doubletime(void) { double now; struct timeval tv_now; if (-1 == gettimeofday(&tv_now, (struct timezone *)0)) return -1; now = tv_now.tv_sec + (tv_now.tv_usec / 1000000.0); return now;}int calc_rotate (int next, double *trotate, int *cur){ double now; /* current time */ time_t tt_now; /* current time */ double irotate; /* interval of next in seconds */ time_t morning; /* start of the day */ struct tm *tm1; irotate = 86400. / (next+1); *cur = 0; if (-1 == (tt_now = now = doubletime())) return -1; /* calculate start of the day */ if (!(tm1 = localtime (&tt_now))) return -1; tm1->tm_sec = 0; tm1->tm_min = 0; tm1->tm_hour = 0; if ((morning = mktime(tm1)) == -1) return -1; if (next) *trotate = morning + irotate; else *trotate = morning + 86400.; while (now > *trotate) { ++ *cur; *trotate += irotate; } return 0;} /* calc_rotate */void fterr_exit_handler(int code){ if (pid && pidfile) unlink_pidfile(pid, pidfile, ftnet.dst_port); exit (code);} /* fterr_exit_handler */void usage(void) { fprintf(stderr, "Usage: flow-capture [-h] [-A AS0_substitution] [-b big|little]\n"); fprintf(stderr, " [-C comment] [-c flow_clients] [-d debug_level] [-D daemonize]\n"); fprintf(stderr, " [-e expire_count] [-E expire_size[bKMG]] [-m privacy_mask]\n"); fprintf(stderr, " [-n rotations] [-N nesting_level] [-p pidfile ] [-R rotate_program]\n"); fprintf(stderr, " [-S stat_interval] [-t tag_fname] [-T tag_active] [-V pdu_version]\n"); fprintf(stderr, " [-z z_level] -w workdir localip/remoteip/port\n"); fprintf(stderr, "Signals:\n"); fprintf(stderr, " SIGHUP - close and rotate current file\n"); fprintf(stderr, " SIGQUIT - close current file and exit\n");fprintf(stderr, "\n%s version %s: built by %s\n", PACKAGE, VERSION, FT_PROG_BUILD);} /* usage */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -