⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 flow-fanout.c

📁 netflow,抓包
💻 C
📖 第 1 页 / 共 2 页
字号:
#endif /* IP_ADD_SOURCE_MEMBERSHIP */    mr.imr_multiaddr.s_addr = htonl(ftpi.rem_ip);    mr.imr_interface.s_addr = INADDR_ANY;        if (setsockopt(ftnet.fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,      (char *)&mr, sizeof(mr)) < 0)      fterr_err(1, "setsockopt(IP_ADD_MEMBERSHIP)");  } else { /* is a multicast group */    /* unicast bind -- multicast support */    if (bind(ftnet.fd, (struct sockaddr*)&ftnet.loc_addr,      sizeof(ftnet.loc_addr)) < 0)      fterr_err(1, "bind()");  } /* not multicast group */#ifdef IP_ADD_SOURCE_MEMBERSHIPmcast_done:#endif /* IP_ADD_SOURCE_MEMBERSHIP */#else /* IP_ADD_MEMBERSHIP */  /* unicast bind -- no multicast support */  if (bind(ftnet.fd, (struct sockaddr*)&ftnet.loc_addr,    sizeof(ftnet.loc_addr)) < 0)    fterr_err(1, "bind()");#endif /* IP_ADD_MEMBERSHIP */#ifdef IP_RECVDSTADDR  one = 1;  /* return the destination IP address */  if (setsockopt(ftnet.fd, IPPROTO_IP, IP_RECVDSTADDR, (char *)&one,    sizeof(one)) < 0)    fterr_err(1, "setsockopt(IP_RECVDSTADDR)");#else#ifdef IP_PKTINFO  one = 1;  /* return the destination IP address */  if (setsockopt(ftnet.fd, IPPROTO_IP, IP_PKTINFO, (char *)&one,    sizeof(one)) < 0)    fterr_err(1, "setsockopt(IP_PKTINFO)");#endif /* else */#endif /* IP_RECVDSTADDR */  /* init hash table for demuxing exporters */  if (!(ftch = ftchash_new(256, sizeof (struct ftchash_rec_exp), 12, 1)))    fterr_errx(1, "ftchash_new(): failed");      /* init msg block */  ftnet.iov[0].iov_len = sizeof ftpdu.buf;  ftnet.iov[0].iov_base = (char*)&ftpdu.buf;  ftnet.msg.msg_iov = (struct iovec*)&ftnet.iov;  ftnet.msg.msg_iovlen = 1;  ftnet.msg.msg_name = &ftnet.rem_addr;  ftnet.msg.msg_namelen = sizeof ftnet.rem_addr;  ftnet.msg.msg_control = &ftnet.msgip;  ftnet.msg.msg_controllen = sizeof ftnet.msgip;        /* default timeout waiting for an active fd */  tv.tv_sec = SELECT_TIMEOUT;  /* setup for ftrec_mask_ip */  if (privacy_mask != 0xFFFFFFFF)    ftrec_compute_mask(&ftipmask, privacy_mask, privacy_mask,      ftset.byte_order);  while (1) {    FD_ZERO (&rfd);    FD_SET (ftnet.fd, &rfd);    if (select (ftnet.fd+1, &rfd, (fd_set *)0, (fd_set *)0, &tv) < 0) {      if (errno == EINTR) {        FD_ZERO (&rfd);      } else {        fterr_err(1, "select()");      }    }    now = time((time_t)0L);    /* reset */    bzero (&tv, sizeof tv);    tv.tv_sec = SELECT_TIMEOUT;    if (stat_interval) {      tm = localtime (&now);      /*       * note there is an obscure race condition here if this       * code is not reached at least every stat_interval*60 seconds       * where up to 1 hour of STAT lines would not show up.       * This is highly unlikely and not handled.       */      if ((tm->tm_min == stat_next) || (stat_next == -1)) {        ftchash_first(ftch);        while ((ftch_recexpp = ftchash_foreach(ftch))) {          fmt_ipv4(fmt_src_ip, ftch_recexpp->src_ip, FMT_JUST_LEFT);          fmt_ipv4(fmt_dst_ip, ftch_recexpp->dst_ip, FMT_JUST_LEFT);          fterr_info(            "STAT: now=%lu startup=%lu src_ip=%s dst_ip=%s d_ver=%d pkts=%lu flows=%lu lost=%lu reset=%lu filter_drops=%lu send_nobufs=%lu",            (unsigned long)now, (unsigned long)time_startup,            fmt_src_ip, fmt_dst_ip,            ftch_recexpp->d_version, (u_long)ftch_recexpp->packets,            (u_long)ftch_recexpp->flows, (u_long)ftch_recexpp->lost,            (u_long)ftch_recexpp->reset, (u_long)ftch_recexpp->filtered_flows,            (u_long)send_nobufs);        }        stat_next = (tm->tm_min + (stat_interval - tm->tm_min % stat_interval))          % 60;      }    } /* stat_inverval */    if (FD_ISSET(ftnet.fd, &rfd)) {restart_recvmsg:      if ((ftpdu.bused = recvmsg(ftnet.fd,        (struct msghdr*)&ftnet.msg, 0)) < 0) {        if (errno == EAGAIN)          goto restart_recvmsg;        fterr_err(1, "recvmsg()");      }#ifdef IP_RECVDSTADDR      /* got destination IP back? */      if ((ftnet.msgip.hdr.cmsg_level == IPPROTO_IP) &&          (ftnet.msgip.hdr.cmsg_type == IP_RECVDSTADDR)) {          ftnet.loc_addr.sin_addr.s_addr = ftnet.msgip.ip.s_addr;      } else {        ftnet.loc_addr.sin_addr.s_addr = 0;      }#else#ifdef IP_PKTINFO      if ((ftnet.msgip.hdr.cmsg_level == IPPROTO_IP) &&          (ftnet.msgip.hdr.cmsg_type == IP_PKTINFO)) {          ftnet.loc_addr.sin_addr.s_addr = ftnet.msgip.pktinfo.ipi_addr.s_addr;      } else {        ftnet.loc_addr.sin_addr.s_addr = 0;      }#else      ftnet.loc_addr.sin_addr.s_addr = 0;#endif#endif /* IP_RECVDSTADDR */      /* fill in hash key */      ftch_recexp.src_ip = htonl(ftnet.rem_addr.sin_addr.s_addr);      ftch_recexp.dst_ip = htonl(ftnet.loc_addr.sin_addr.s_addr);      ftch_recexp.dst_port = ftnet.dst_port;      /* verify integrity, get version */      if (ftpdu_verify(&ftpdu) < 0) {        fmt_ipv4(fmt_src_ip, ftch_recexp.src_ip, FMT_JUST_LEFT);        fterr_warnx("ftpdu_verify(): src_ip=%s failed.", fmt_src_ip);        flows_corrupt ++;        goto skip1;      }      /* rest of hash key */      ftch_recexp.d_version = ftpdu.ftv.d_version;      /* if exporter src IP has been configured then make sure it matches */      if (ftnet.rem_ip && (ftnet.rem_ip != ftch_recexp.src_ip)) {        fmt_ipv4(fmt_src_ip, ftch_recexp.src_ip, FMT_JUST_LEFT);        fterr_warnx("Unexpected PDU: src_ip=%s not configured", fmt_src_ip);        flows_corrupt ++;        goto skip1;      }      /* first flow or no configured destination version? */      if (!ftv.set) {        /* copy to compare next time */        bcopy(&ftpdu.ftv, &ftv, sizeof ftv);        /* flag struct as configured */        ftv.set = 1;        /* configure encoder version */        bcopy(&ftv, &fte.ver, sizeof ftv);        /* need offsets for filter later */        fts3rec_compute_offsets(&fo, &ftv);      } else {        /* translation to/from v8 not possible */        if (((ftv.d_version == 8) && (ftpdu.ftv.d_version != 8)) ||            ((ftv.d_version != 8) && (ftpdu.ftv.d_version == 8))) {          fmt_ipv4(fmt_src_ip, ftch_recexp.dst_ip, FMT_JUST_LEFT);          fterr_warnx("Unexpected PDU: src_ip=%s no v8 translation",            fmt_src_ip);          ++flows_corrupt;          goto skip1;        }        /* translation among v8 aggregation methods not possible */        if ((ftv.d_version == 8) && ((ftv.agg_method != ftpdu.ftv.agg_method)          || (ftv.agg_version != ftpdu.ftv.agg_version))) {          fmt_ipv4(fmt_src_ip, ftch_recexp.dst_ip, FMT_JUST_LEFT);          fterr_warnx(            "Unexpected PDU: src_ip=%s multi v8 oagg=%d agg=%d over=%d ver=%d",            fmt_src_ip, (int)ftv.agg_method, (int)ftpdu.ftv.agg_method,            (int)ftv.agg_version, (int)ftpdu.ftv.agg_version);          ++flows_corrupt;          goto skip1;        }      } /* version processing */      /* compute 8 bit hash */      hash = (ftch_recexp.src_ip & 0xFF);      hash ^= (ftch_recexp.src_ip>>24);      hash ^= (ftch_recexp.dst_ip & 0xFF);      hash ^= (ftch_recexp.dst_ip>>24);      hash ^= (ftch_recexp.d_version & 0xFF);      if (!(ftch_recexpp = ftchash_update(ftch, &ftch_recexp, hash)))        fterr_errx(1, "ftch_update(): failed");      /* if the packet count is 0, then this is a new entry */      if (ftch_recexpp->packets == 0) {        fmt_ipv4(fmt_src_ip, ftch_recexp.src_ip, FMT_JUST_LEFT);        fmt_ipv4(fmt_dst_ip, ftch_recexp.dst_ip, FMT_JUST_LEFT);        fterr_info("New exporter: time=%lu src_ip=%s dst_ip=%s d_version=%d",          (u_long)now, fmt_src_ip, fmt_dst_ip, (int)ftpdu.ftv.d_version);        /* set translation function */        if (ftch_recexp.d_version != ftv.d_version)          ftch_recexpp->xlate = ftrec_xlate_func(&ftpdu.ftv, &ftv);      }      /* verify sequence number */      if (ftpdu_check_seq(&ftpdu, &(ftch_recexpp->ftseq)) < 0) {        fmt_ipv4(fmt_src_ip, ftch_recexp.src_ip, FMT_JUST_LEFT);        fmt_ipv4(fmt_dst_ip, ftch_recexp.dst_ip, FMT_JUST_LEFT);        fmt_uint16(fmt_dst_port, ftch_recexp.dst_port, FMT_JUST_LEFT);        fterr_warnx(          "ftpdu_seq_check(): src_ip=%s dst_ip=%s d_version=%d expecting=%lu received=%lu lost=%lu",          fmt_src_ip, fmt_dst_ip, (int)ftpdu.ftv.d_version,          (u_long)ftch_recexpp->ftseq.seq_exp,          (u_long)ftch_recexpp->ftseq.seq_rcv,          (u_long)ftch_recexpp->ftseq.seq_lost);        /* only count these lost if "lost" is a reasonable number */        if (ftch_recexpp->ftseq.seq_lost < FT_SEQ_RESET) {          flows_lost += ftch_recexpp->ftseq.seq_lost;          ftch_recexpp->lost += ftch_recexpp->ftseq.seq_lost;        } else {          flows_reset ++;          ftch_recexpp->reset ++;        }      }      /* decode */      ftpdu.ftd.byte_order = ftset.byte_order;      ftpdu.ftd.as_sub = ftset.as_sub;      ftpdu.ftd.exporter_ip = ftch_recexp.src_ip;        n = fts3rec_pdu_decode(&ftpdu);      /* update the exporter stats */      ftch_recexpp->packets ++;        ftch_recexpp->flows += n;      /* write decoded flows */      for (i = 0, offset = 0; i < n; ++i, offset += ftpdu.ftd.rec_size) {        /* simple data privacy */         if (privacy_mask != 0xFFFFFFFF)          ftrec_mask_ip(ftpdu.ftd.buf+offset, &ftpdu.ftv, &ftipmask);        /* translate version? */        if (ftch_recexpp->xlate) {          ftch_recexpp->xlate(ftpdu.ftd.buf+offset, &xl_rec);          out_rec = (char*)&xl_rec;        } else {          out_rec = (char*)ftpdu.ftd.buf+offset;        }        /* filter? */        if (ftfd)          if (ftfil_def_eval(ftfd, out_rec, &fo) == FT_FIL_MODE_DENY) {            ++ftch_recexpp->filtered_flows;            continue;          }retry:        ret = fts3rec_pdu_encode(&fte, out_rec);              /*   ret == 0 then send and clear out buffer         *   ret > 0 then can encode another         *   ret < 0 then this encoding failed, send and clear out buffer         */        /* need to transmit? */        if (ret < 0) {          pdu_xmit(npeers, tx_delay, src_ip_spoof, hdr_len, &send_nobufs,            ip_hdr, udp_hdr, &fte, peers, &ftnet);        } /* ret < 0 */        /* if ret < 0 then the current record was not encoded */        if (ret < 0)          goto retry;      } /* for each flow */          /* any encoded flows that have not been transmitted */      if (fte.buf_size) {        pdu_xmit(npeers, tx_delay, src_ip_spoof, hdr_len, &send_nobufs,          ip_hdr, udp_hdr, &fte, peers, &ftnet);          } /* fte.buf_size */    skip1:    } /* if FD_ISSET */    if (sig_quit_flag) {      fterr_info("SIGQUIT");      break;    }    if (sig_hup_flag) {      fterr_info("SIGHUP");      sig_hup_flag = 0;    }  } /* while 1 */  if (pidfile)    unlink_pidfile(pid, pidfile, listen_port);  for (n = 0; n < npeers; ++n)    close(peers[0].fd);  return 0;} /* main */void sig_quit(int sig){  sig_quit_flag = 1;}void sig_hup(int sig){  sig_hup_flag = 1;}void fterr_exit_handler(int code){  if (pid)    if (pidfile)      unlink_pidfile(pid, pidfile, listen_port);  exit (code);} /* fterr_exit_handler */ void pdu_xmit(int npeers, int tx_delay, int src_ip_spoof, int hdr_len,  u_int32 *send_nobufs, struct ip *ip_hdr, struct udphdr *udp_hdr,  struct ftencode *fte, struct peer *peers, struct ftnet *ftnet){  int j, sum;  /* convert pdu to network byte order */#if BYTE_ORDER == LITTLE_ENDIAN  ftpdu_swap(fte->buf_enc, BYTE_ORDER);#endif /* BYTE_ORDER == LITTLE_ENDIAN */  /* do this once for all destinations */  ftencode_sum_data(fte);  for (j = 0; j < npeers; ++j) {    if (src_ip_spoof) {/* see Stevens Unix Network Programming Volume 1 2nd edition page 657 *//* conditional from <simon@limmat.switch.ch> rawsend.c */#if defined (__linux__) || (defined (__OpenBSD__) && (OpenBSD > 199702))      ip_hdr->ip_len = htons(FT_ENC_IPHDR_LEN+fte->buf_size);#else      ip_hdr->ip_len = FT_ENC_IPHDR_LEN+fte->buf_size;#endif      ip_hdr->ip_ttl = peers[j].ttl;      /* use transmit source if loc_addr is not specified */      if (!peers[j].loc_addr.sin_addr.s_addr)        ip_hdr->ip_src.s_addr = ftnet->rem_addr.sin_addr.s_addr;      else        ip_hdr->ip_src.s_addr = peers[j].loc_addr.sin_addr.s_addr;      ip_hdr->ip_dst.s_addr = peers[j].rem_addr.sin_addr.s_addr;      udp_hdr->uh_sport = htons(7999+j);      udp_hdr->uh_dport = peers[j].rem_addr.sin_port;      udp_hdr->uh_ulen = htons(fte->buf_size+8);      udp_hdr->uh_sum = 0;      sum = fte->d_sum;      sum += udp_cksum(ip_hdr, udp_hdr, fte->buf_size+8);      sum = (sum >> 16) + (sum & 0xffff);      sum += (sum >> 16);      udp_hdr->uh_sum = ~sum;    } again:     if (send(peers[j].fd, (char*)&fte->buf, fte->buf_size+hdr_len, 0) < 0) {      /* always complete a send, drop flows in the kernel on receive if         overloaded */      if (errno == ENOBUFS) {        ++ *send_nobufs;        usleep(1);        goto again;      }      if (errno != ECONNREFUSED)        fterr_warn("send(j=%d)", j);    }    if (tx_delay)      usleep((unsigned)tx_delay);  } /* foreach peer to send to */  /* reset encode buffer */  ftencode_reset(fte);} /* pdu_xmit */void usage(void){  fprintf(stderr, "usage: flow-fanout [-hDs] [-A AS0_substitution] [-d debug_level]\n");  fprintf(stderr, "       [-m privacy_mask] [-p pidfile] [-S stat_interval] [-V pdu_version]\n");  fprintf(stderr, "       [-x] xmit_delay] localip/remoteip/port localip/remoteip/port ...\n");  fprintf(stderr, "\n%s version %s: built by %s\n", PACKAGE, VERSION, FT_PROG_BUILD);} /* usage */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -