📄 flow-capture.c
字号:
fterr_err(1, "signal(SIGCHLD)"); /* sandbox */ if (chdir(work_dir) == -1) fterr_err(1, "chdir(%s)", work_dir); /* * load directory entries into the file ager */ if (fte.expiring) if (ftfile_loaddir(&fte, ".", FT_FILE_SORT|FT_FILE_INIT|FT_FILE_CHECKNAMES)) fterr_errx(1, "ftfile_scandir(): failed"); /* debugging gets a dump of the ager */ if (debug) ftfile_dump(&fte); /* run the ager once now */ if (fte.expiring) if (ftfile_expire(&fte, enable_unlink, (u_int32)0)) fterr_errx(1, "ftfile_export(): failed"); /* get hostname */ if (gethostname((char*)&ftset.hnbuf, FT_HOSTNAME_LEN-1) == -1) fterr_err(1, "gethostname()"); /* ensure null terminated */ ftset.hnbuf[FT_HOSTNAME_LEN-1] = 0; /* socket to receive flow pdu exports */ if ((ftnet.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) fterr_err(1, "socket()"); if (bigsockbuf(ftnet.fd, SO_RCVBUF, FT_SO_RCV_BUFSIZE) < 0) fterr_err(1, "bigsockbuf()");/* multicast capable? */#ifdef IP_ADD_MEMBERSHIP if (IN_CLASSD(ftpi.rem_ip)) { /* source is the first arg now */ ftnet.rem_ip = ftpi.loc_ip; ftnet.loc_ip = ftpi.rem_ip; /* socket API usually requires INADDR_ANY * and s/g/port identifier does not have a source interface field * to use here */ bzero(&tmp_addr, sizeof tmp_addr); tmp_addr.sin_family = AF_INET; tmp_addr.sin_port = htons(ftnet.dst_port); one = 1; /* Multicast streams may have multiple receivers */ if (setsockopt(ftnet.fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)) < 0) fterr_err(1, "setsockopt(SO_REUSEADDR)"); if (bind(ftnet.fd, (struct sockaddr*)&tmp_addr, sizeof(struct sockaddr)) < 0) fterr_err(1, "bind(%s)", inet_ntoa(tmp_addr.sin_addr));#ifdef IP_ADD_SOURCE_MEMBERSHIP /* ssm address? */ if (IN_CLASSD_SSM(ftpi.rem_ip)) { mrs.imr_sourceaddr.s_addr = htonl(ftpi.loc_ip); mrs.imr_multiaddr.s_addr = htonl(ftpi.rem_ip); mrs.imr_interface.s_addr = INADDR_ANY; if (setsockopt(ftnet.fd, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, (char*)&mrs, sizeof(mrs)) < 0) fterr_err(1, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)"); } goto mcast_done;#endif /* IP_ADD_SOURCE_MEMBERSHIP */ mr.imr_multiaddr.s_addr = htonl(ftpi.rem_ip); mr.imr_interface.s_addr = INADDR_ANY; if (setsockopt(ftnet.fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&mr, sizeof(mr)) < 0) fterr_err(1, "setsockopt(IP_ADD_MEMBERSHIP)"); } else { /* is a multicast group */ /* unicast bind -- multicast support */ if (bind(ftnet.fd, (struct sockaddr*)&ftnet.loc_addr, sizeof(ftnet.loc_addr)) < 0) fterr_err(1, "bind(%s)", inet_ntoa(ftnet.loc_addr.sin_addr)); } /* not multicast group */#ifdef IP_ADD_SOURCE_MEMBERSHIPmcast_done:#endif /* IP_ADD_SOURCE_MEMBERSHIP */#else /* IP_ADD_MEMBERSHIP */ /* unicast bind -- no multicast support */ if (bind(ftnet.fd, (struct sockaddr*)&ftnet.loc_addr, sizeof(ftnet.loc_addr)) < 0) fterr_err(1, "bind(%s)", inet_ntoa(ftnet.loc_addr.sin_addr)));#endif /* IP_ADD_MEMBERSHIP */#ifdef IP_RECVDSTADDR one = 1; /* return the destination IP address */ if (setsockopt(ftnet.fd, IPPROTO_IP, IP_RECVDSTADDR, (char *)&one, sizeof(one)) < 0) fterr_err(1, "setsockopt(IP_RECVDSTADDR)");#else#ifdef IP_PKTINFO one = 1; /* return the destination IP address */ if (setsockopt(ftnet.fd, IPPROTO_IP, IP_PKTINFO, (char *)&one, sizeof(one)) < 0) fterr_err(1, "setsockopt(IP_PKTINFO)");#endif /* else */#endif /* IP_RECVDSTADDR */ /* init hash table for demuxing exporters */ if (!(ftch = ftchash_new(256, sizeof (struct ftchash_rec_exp), 12, 1))) fterr_errx(1, "ftchash_new(): failed"); /* init msg block */ ftnet.iov[0].iov_len = sizeof ftpdu.buf; ftnet.iov[0].iov_base = (char*)&ftpdu.buf; ftnet.msg.msg_iov = (struct iovec*)&ftnet.iov; ftnet.msg.msg_iovlen = 1; ftnet.msg.msg_name = &ftnet.rem_addr; ftnet.msg.msg_namelen = sizeof ftnet.rem_addr; ftnet.msg.msg_control = &ftnet.msgip; ftnet.msg.msg_controllen = sizeof ftnet.msgip; while (1) { FD_ZERO (&rfd); FD_SET (ftnet.fd, &rfd); if (client.enabled && client.max) { FD_SET (client.fd, &rfd); max_fd = (client.fd > ftnet.fd) ? client.fd : ftnet.fd; } else { max_fd = ftnet.fd; } if (select (max_fd+1, &rfd, (fd_set *)0, (fd_set *)0, &tv) < 0) { if (errno == EINTR) { FD_ZERO (&rfd); } else { fterr_err(1, "select()"); } } bzero (&tv, sizeof tv); tv.tv_sec = SELECT_TIMEOUT; tt_now = now = doubletime(); /* new TCP client connection ? */ if ((client.max) && (FD_ISSET(client.fd, &rfd))) { /* too many clients? */ if (client.active >= client.max) { /* bye */ i = accept(client.fd, (struct sockaddr*)&tmp_addr, &tmp_len); close(i); fterr_warnx("Maximum clients exceeded, rejecting connection from %s", inet_ntoa(tmp_addr.sin_addr)); goto skip_client; } /* allocate a new client record, fail gracefully */ if (!(client_rec = (struct client_rec*)malloc(sizeof *client_rec))) { fterr_warn("malloc()"); /* bye */ tmp_len = sizeof (struct sockaddr_in); i = accept(client.fd, (struct sockaddr*)&client_rec->addr, &tmp_len); close(i); goto skip_client; } bzero(client_rec, sizeof *client_rec); /* link in the new record */ FT_LIST_INSERT_HEAD(&client.list, client_rec, chain); tmp_len = sizeof (struct sockaddr_in); /* accept() the connection */ if ((client_rec->fd = accept(client.fd, (struct sockaddr*)&client_rec->addr, &tmp_len)) < 0) { fterr_warn("accept()"); FT_LIST_REMOVE(client_rec, chain); free(client_rec); goto skip_client; }#ifdef HAVE_LIBWRAP request_init(&client.tcpd, RQ_DAEMON, "flow-capture-client", RQ_FILE, client_rec->fd, NULL); fromhost(&client.tcpd); if (!hosts_access(&client.tcpd)) { fterr_warnx("client %s refused by libwrap", inet_ntoa(client_rec->addr.sin_addr)); close(client_rec->fd); FT_LIST_REMOVE(client_rec, chain); free(client_rec); goto skip_client; }#endif /* HAVE_LIBWRAP */ if (bigsockbuf(client_rec->fd, SO_SNDBUF, FT_SO_SND_BUFSIZE) < 0) fterr_warn("bigsockbuf()"); /* log it */ client_rec->conn_time = tt_now; fterr_info("client connect: ip=%s time=%lu", inet_ntoa(client_rec->addr.sin_addr), (unsigned long)client_rec->conn_time); /* ftio_init the stream */ if (ftio_init(&client_rec->ftio, client_rec->fd, FT_IO_FLAG_NO_SWAP | FT_IO_FLAG_WRITE) < 0) { fterr_warnx("ftio_init(): failed for client"); close(client_rec->fd); FT_LIST_REMOVE(client_rec, chain); free(client_rec); goto skip_client; } /* set the version information in the io stream */ if (ftio_set_ver(&client_rec->ftio, &ftv) < 0) fterr_errx(1, "ftio_set_ver(): failed"); ftio_set_comment(&client_rec->ftio, ftset.comments); ftio_set_cap_hostname(&client_rec->ftio, ftset.hnbuf); ftio_set_byte_order(&client_rec->ftio, ftset.byte_order); ftio_set_cap_time(&client_rec->ftio, cap_file.time, 0); ftio_set_debug(&client_rec->ftio, debug); /* header first */ if ((n = ftio_write_header(&client_rec->ftio)) < 0) { fterr_warnx("ftio_write_header(): failed for client"); ftio_close(&client_rec->ftio); FT_LIST_REMOVE(client_rec, chain); free(client_rec); goto skip_client; } ++client.active; } /* new TCP client */skip_client: /* stake the zombies */ if (sig_chld_flag) { /* not again */ sig_chld_flag = 0; while (1) { child_pid = wait3(&child_status, WNOHANG, 0); /* no more dead children? */ if (!child_pid) break; /* no more dead children? */ if ((child_pid == -1) && (errno == ECHILD)) break; if (WIFEXITED(child_status)) { if (WEXITSTATUS(child_status)) fterr_warnx("Child %d exit_status=%d", (int)child_pid, WEXITSTATUS(child_status)); } else if (WIFSIGNALED(child_status)) { fterr_warnx("Child %d signal=%d", (int)child_pid, WTERMSIG(child_status)); } else { fterr_warnx("PID %d exited status=%d", (int)child_pid, child_status); } } /* buffy */ } /* sig_chld_flag */ if (stat_interval) { tm = localtime (&tt_now); /* * note there is an obscure race condition here if this * code is not reached at least every stat_interval*60 seconds * where up to 1 hour of STAT lines would not show up. * This is highly unlikely and not handled. */ if ((tm->tm_min == stat_next) || (stat_next == -1)) { ftchash_first(ftch); while ((ftch_recexpp = ftchash_foreach(ftch))) { fmt_ipv4(fmt_src_ip, ftch_recexpp->src_ip, FMT_JUST_LEFT); fmt_ipv4(fmt_dst_ip, ftch_recexpp->dst_ip, FMT_JUST_LEFT); fterr_info( "STAT: now=%lu startup=%lu src_ip=%s dst_ip=%s d_ver=%d pkts=%lu flows=%lu lost=%lu reset=%lu filter_drops=%lu", (unsigned long)tt_now, (unsigned long)time_startup, fmt_src_ip, fmt_dst_ip, ftch_recexpp->d_version, (u_long)ftch_recexpp->packets, (u_long)ftch_recexpp->flows, (u_long)ftch_recexpp->lost, (u_long)ftch_recexpp->reset, (u_long)ftch_recexpp->filtered_flows); } stat_next = (tm->tm_min + (stat_interval - tm->tm_min % stat_interval)) % 60; } } /* stat_inverval */ /* flag for work later on */ ftpdu.ftd.count = 0; /* PDU ready */ if (FD_ISSET(ftnet.fd, &rfd)) {restart_recvmsg: if ((ftpdu.bused = recvmsg(ftnet.fd, (struct msghdr*)&ftnet.msg, 0)) < 0) { if (errno == EAGAIN) goto restart_recvmsg; fterr_err(1, "recvmsg()"); }#ifdef IP_RECVDSTADDR /* got destination IP back? */ if ((ftnet.msgip.hdr.cmsg_level == IPPROTO_IP) && (ftnet.msgip.hdr.cmsg_type == IP_RECVDSTADDR)) { ftnet.loc_addr.sin_addr.s_addr = ftnet.msgip.ip.s_addr; } else { ftnet.loc_addr.sin_addr.s_addr = 0; }#else#ifdef IP_PKTINFO if ((ftnet.msgip.hdr.cmsg_level == IPPROTO_IP) && (ftnet.msgip.hdr.cmsg_type == IP_PKTINFO)) { ftnet.loc_addr.sin_addr.s_addr = ftnet.msgip.pktinfo.ipi_addr.s_addr; } else { ftnet.loc_addr.sin_addr.s_addr = 0; }#else ftnet.loc_addr.sin_addr.s_addr = 0;#endif#endif /* IP_RECVDSTADDR */ /* fill in hash key */ ftch_recexp.src_ip = htonl(ftnet.rem_addr.sin_addr.s_addr); ftch_recexp.dst_ip = htonl(ftnet.loc_addr.sin_addr.s_addr); ftch_recexp.dst_port = ftnet.dst_port; /* verify integrity, get version */ if (ftpdu_verify(&ftpdu) < 0) { fmt_ipv4(fmt_src_ip, ftch_recexp.src_ip, FMT_JUST_LEFT); fterr_warnx("ftpdu_verify(): src_ip=%s failed.", fmt_src_ip); ++cap_file.hdr_flows_corrupt; goto skip_pdu_decode; } /* rest of hash key */ ftch_recexp.d_version = ftpdu.ftv.d_version; /* if exporter src IP has been configured then make sure it matches */ if (ftnet.rem_ip && (ftnet.rem_ip != ftch_recexp.src_ip)) { fmt_ipv4(fmt_src_ip, ftch_recexp.src_ip, FMT_JUST_LEFT); fterr_warnx("Unexpected PDU: src_ip=%s not configured", fmt_src_ip); ++cap_file.hdr_flows_corrupt; goto skip_pdu_decode; } /* first flow or no configured destination version? */ if (!ftv.set) { /* copy to compare next time */ bcopy(&ftpdu.ftv, &ftv, sizeof ftv); /* flag struct as configured */ ftv.set = 1; } else { /* translation to/from v8 not possible */ if (((ftv.d_version == 8) && (ftpdu.ftv.d_version != 8)) || ((ftv.d_version != 8) && (ftpdu.ftv.d_version == 8))) { fmt_ipv4(fmt_src_ip, ftch_recexp.dst_ip, FMT_JUST_LEFT); fterr_warnx("Unexpected PDU: src_ip=%s no v8 translation", fmt_src_ip); cap_file.hdr_flows_corrupt ++; goto skip_pdu_decode; } /* translation among v8 aggregation methods not possible */ if ((ftv.d_version == 8) && ((ftv.agg_method != ftpdu.ftv.agg_method) || (ftv.agg_version != ftpdu.ftv.agg_version))) { fmt_ipv4(fmt_src_ip, ftch_recexp.dst_ip, FMT_JUST_LEFT); fterr_warnx( "Unexpected PDU: src_ip=%s multi v8 oagg=%d agg=%d over=%d ver=%d", fmt_src_ip, (int)ftv.agg_method, (int)ftpdu.ftv.agg_method, (int)ftv.agg_version, (int)ftpdu.ftv.agg_version); cap_file.hdr_flows_corrupt ++; goto skip_pdu_decode; } } /* version processing */ /* compute 8 bit hash */ hash = (ftch_recexp.src_ip & 0xFF); hash ^= (ftch_recexp.src_ip>>24); hash ^= (ftch_recexp.dst_ip & 0xFF); hash ^= (ftch_recexp.dst_ip>>24); hash ^= (ftch_recexp.d_version & 0xFF); /* get/create hash table entry */ if (!(ftch_recexpp = ftchash_update(ftch, &ftch_recexp, hash))) fterr_errx(1, "ftch_update(): failed"); /* if the packet count is 0, then this is a new entry */ if (ftch_recexpp->packets == 0) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -