📄 fprobe.c
字号:
#if ((DEBUG) & DEBUG_I) pkts_total++;#endif flow = pending_head; flow->size = pkthdr->len; if (!(link_layer)) flow->size -= off_nl;#if ((DEBUG) & DEBUG_I) size_total += flow->size;#endif flow->sip = nl->ip_src; flow->dip = nl->ip_dst; flow->tos = nl->ip_tos; flow->proto = nl->ip_p; flow->id = 0; flow->tcp_flags = 0; flow->pkts = 1; flow->sizeF = 0; flow->sizeP = 0; flow->ctime.sec = pkthdr->ts.tv_sec; flow->ctime.usec = pkthdr->ts.tv_usec; flow->mtime = flow->ctime; off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3; /* Offset (from network layer) to transport layer header/IP data IOW IP header size ;-) ?FIXME? Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks... */ off_tl = nl->ip_hl << 2; tl = (void *) nl + off_tl; /* THIS packet data size: data_size = total_size - ip_header_size*4 */ flow->sizeF = ntohs(nl->ip_len) - off_tl; psize -= off_tl; if ((signed) flow->sizeF < 0) flow->sizeF = 0; if (psize > (signed) flow->sizeF) psize = flow->sizeF; if (ntohs(nl->ip_off) & (IP_MF | IP_OFFMASK)) { /* Fragmented packet (IP_MF flag == 1 or fragment offset != 0) */#if ((DEBUG) & DEBUG_C) strcat(logbuf, " F");#endif#if ((DEBUG) & DEBUG_I) pkts_total_fragmented++;#endif flow->flags |= FLOW_FRAG; flow->id = nl->ip_id; if (!(ntohs(nl->ip_off) & IP_MF)) { /* Packet whith IP_MF contains information about whole datagram size */ flow->flags |= FLOW_LASTFRAG; /* size = frag_offset*8 + data_size */ flow->sizeP = off_frag + flow->sizeF; } }#if ((DEBUG) & DEBUG_C) sprintf(buf, " %s>", inet_ntoa(flow->sip)); strcat(logbuf, buf); sprintf(buf, "%s P:%x", inet_ntoa(flow->dip), flow->proto); strcat(logbuf, buf);#endif /* Fortunately most interesting transport layer information fit into first 8 bytes of IP data field (minimal nonzero size). Thus we don't need actual packet reassembling to build whole transport layer data. We only check the fragment offset for zero value to find packet with this information. */ if (!off_frag && psize >= 8) { switch (flow->proto) { case IPPROTO_TCP: case IPPROTO_UDP: flow->sp = ((struct udphdr *)tl)->uh_sport; flow->dp = ((struct udphdr *)tl)->uh_dport; goto tl_known;#ifdef ICMP_TRICK case IPPROTO_ICMP: flow->sp = htons(((struct icmp *)tl)->icmp_type); flow->dp = htons(((struct icmp *)tl)->icmp_code); goto tl_known;#endif default: /* Unknown transport layer */#if ((DEBUG) & DEBUG_C) strcat(logbuf, " U");#endif flow->sp = 0; flow->dp = 0; break; tl_known:#if ((DEBUG) & DEBUG_C) sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp)); strcat(logbuf, buf);#endif flow->flags |= FLOW_TL; } } /* Check for tcp flags presence. */ if (flow->proto == IPPROTO_TCP && off_frag < 16 && psize >= 16 - off_frag) { flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag)) & 0x3f;#if ((DEBUG) & DEBUG_C) sprintf(buf, " TCP:%x", flow->tcp_flags); strcat(logbuf, buf);#endif }#if ((DEBUG) & DEBUG_C) sprintf(buf, " => %x", (unsigned) flow); strcat(logbuf, buf); my_log(LOG_DEBUG, "%s", logbuf);#endif#if ((DEBUG) & DEBUG_I) pkts_pending++; pending_queue_trace_candidate = pkts_pending - pkts_pending_done; if (pending_queue_trace < pending_queue_trace_candidate) pending_queue_trace = pending_queue_trace_candidate;#endif /* Flow complete - inform unpending_thread() about it */ pending_head->flags |= FLOW_PENDING; pending_head = pending_head->next;done: pthread_cond_signal(&unpending_cond);#ifdef WALL return; useless = 0;#endif}void *pcap_thread(){ pcap_loop(pcap_handle, -1, pcap_callback, 0); my_log(LOG_INFO, "pcap_loop() terminated: %s", strerror(errno)); kill(pid, SIGTERM); /* Suicide */#ifdef WALL return 0;#endif}int main(int argc, char **argv){ char errbuf[PCAP_ERRBUF_SIZE]; struct bpf_program bpf_filter; char *dhost, *dport, *errpos; int c, i; int memory_limit = 0, link_type, link_type_idx; struct servent *serve; struct hostent *hoste; pthread_attr_t tattr; struct sigaction sigact; static void *threads[THREADS - 1] = {&emit_thread, &scan_thread, &unpending_thread, &pcap_thread}; struct timeval timeout;#ifdef WALL link_type_idx = 0;#endif sched_min = sched_get_priority_min(SCHED); sched_max = sched_get_priority_max(SCHED); /* Process command line options */ opterr = 0; while ((c = my_getopt(argc, argv, parms)) != -1) { switch (c) { case '?': usage(); case 'h': usage(); } } promisc = -(--parms[pflag].count); dev = parms[iflag].arg; if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg); if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg); if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg); if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg); if (parms[nflag].count) { switch (atoi(parms[nflag].arg)) { case 1: netflow = &NetFlow1; break; case 5: break; case 7: netflow = &NetFlow7; break; default: fprintf(stderr, "Illegal %s\n", "NetFlow version"); exit(1); } } if (parms[vflag].count) verbosity = atoi(parms[vflag].arg); if (parms[lflag].count) log_dest = atoi(parms[lflag].arg); if (parms[qflag].count) { pending_queue_length = atoi(parms[qflag].arg); if (pending_queue_length < 1) { fprintf(stderr, "Illegal %s\n", "pending queue length"); exit(1); } } if (parms[rflag].count) { schedp.sched_priority = atoi(parms[rflag].arg); if (schedp.sched_priority && (schedp.sched_priority < sched_min || schedp.sched_priority > sched_max)) { fprintf(stderr, "Illegal %s\n", "realtime priority"); exit(1); } } if (parms[bflag].count > 0) { bulk_quantity = atoi(parms[bflag].arg); if (bulk_quantity < 1 || bulk_quantity > BULK_QUANTITY_MAX) { fprintf(stderr, "Illegal %s\n", "bulk size"); exit(1); } } if (parms[mflag].count) memory_limit = atoi(parms[mflag].arg) << 10; if (parms[xflag].count) if ((sscanf(parms[xflag].arg, "%d:%d", &snmp_input_index, &snmp_output_index)) == 1) snmp_output_index = snmp_input_index; if (parms[tflag].count) sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay); if (parms[aflag].count) { if ((hoste = gethostbyname(parms[aflag].arg))) { client.sin_addr.s_addr = ((struct in_addr*)(*hoste->h_addr_list))->s_addr; } else { fprintf(stderr, "Illegal %s\n", "source address"); exit(1); } } else client.sin_addr.s_addr = htonl(INADDR_ANY); if (parms[Kflag].count) link_layer_size = atoi(parms[Kflag].arg); link_layer = parms[kflag].count; dhost = argv[argc-1]; if (!dhost || argc < 2) usage(); for (dport = dhost; *dport; dport++) if (*dport == ':') break; if (!*dport) goto bad_address; *dport++ = 0; if (!*dport) goto bad_address; serve = getservbyname(dport, "udp"); if (serve) server.sin_port = serve->s_port; else { i = strtol(dport, &errpos, 0); if ((errpos[0]) || (i < 1) || (i > 65535)) goto bad_address; server.sin_port = htons(i); } hoste = gethostbyname(dhost); *--dport = ':'; if (hoste) server.sin_addr.s_addr = ((struct in_addr*)(*hoste->h_addr_list))->s_addr; else { bad_address: fprintf(stderr, "Need correct collector address\n"); exit(1); } if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { fprintf(stderr, "socket(): %s\n", strerror(errno)); exit(1); } server.sin_family = AF_INET; client.sin_family = AF_INET; client.sin_port = 0; if (bind(sock, (struct sockaddr *) &client, sizeof(client)) < 0) { fprintf(stderr, "bind(): %s\n", strerror(errno)); exit(1); } /* Daemonize (if log destination stdout-free) */ my_log_open(ident, verbosity, log_dest); pid = getpid(); sprintf(ident, "fprobe[%d]", (int) pid); /* Foreground ident */ if (!(log_dest & 2)) { switch (fork()) { case -1: my_log(LOG_CRIT, "fork(): %s", strerror(errno)); exit(1); case 0: pid = getpid(); sprintf(ident, "fprobe[%d]", (int) pid); /* Background ident */ if (setsid() < 0) { my_log(LOG_CRIT, "setsid(): %s", strerror(errno)); exit(1); } freopen("/dev/null", "r", stdin); freopen("/dev/null", "w", stdout); freopen("/dev/null", "w", stderr); break; default: exit(0); } } else { setvbuf(stdout, (char *)0, _IONBF, 0); setvbuf(stderr, (char *)0, _IONBF, 0); } /* Initialization */ /* Initialize libpcap. ?FIXME? There are some strange things with FreeBSD's libpcap - it must be initialized from child (daemonized) process. */ if (!dev) if (!(dev = pcap_lookupdev(errbuf))) { my_log(LOG_CRIT, "pcap_lookupdev(): %s\n", errbuf); exit(1); } pcap_handle = pcap_open_live(dev, CAPTURE_SIZE, promisc, 1000, errbuf); if (!pcap_handle) { my_log(LOG_CRIT, "pcap_open_live(): %s\n",errbuf); exit(1); } link_type = pcap_datalink(pcap_handle); for (i = 0; dlt[i].linktype != -1; i++) if (dlt[i].linktype == link_type) { off_nl = dlt[i].offset_nl; link_type_idx = i; break; } if (off_nl == -1) { my_log(LOG_CRIT, "Unsupported data link type %d\n", link_type); exit(1); } if (link_layer_size >= 0) off_nl = link_layer_size; if (parms[fflag].arg) { filter = parms[fflag].arg; if (pcap_compile(pcap_handle, &bpf_filter, filter, 1, 0) == -1) { my_log(LOG_CRIT, "pcap_compile(): %s. Filter: %s\n", pcap_geterr(pcap_handle), filter); exit(1); } if (pcap_setfilter(pcap_handle, &bpf_filter) == -1) { my_log(LOG_CRIT, "pcap_setfilter(): %s\n", pcap_geterr(pcap_handle)); exit(1); } } else my_log(LOG_WARNING, "Filter expression is empty! Are you sure?"); hash_init(); /* Actually for crc16 only */ mem_init(sizeof(struct Flow), bulk_quantity, memory_limit); for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);#ifdef UPTIME_TRICK start_time_offset = active_lifetime + inactive_lifetime + scan_interval;#endif gettime(&start_time); /* Build static pending queue as circular buffer. We can't use dynamic flow allocation in pcap_callback() because memory routines shared between threads, including non-realtime. Collision (mem_mutex lock in mem_alloc()) of pcap_callback() with such (non-realtime) thread may cause intolerable slowdown and packets loss as effect. */ if (!(pending_head = mem_alloc())) goto err_mem_alloc; pending_tail = pending_head; for (i = pending_queue_length - 1; i--;) { if (!(pending_tail->next = mem_alloc())) { err_mem_alloc: my_log(LOG_CRIT, "mem_alloc(): %s\n", strerror(errno)); exit(1); } pending_tail = pending_tail->next; } pending_tail->next = pending_head; pending_tail = pending_head; sigemptyset(&sig_mask); sigact.sa_handler = &sighandler; sigact.sa_mask = sig_mask; sigact.sa_flags = 0; sigaddset(&sig_mask, SIGTERM); sigaction(SIGTERM, &sigact, 0);#if ((DEBUG) & DEBUG_I) sigaddset(&sig_mask, SIGUSR1); sigaction(SIGUSR1, &sigact, 0);#endif if (pthread_sigmask(SIG_BLOCK, &sig_mask, 0)) { my_log(LOG_CRIT, "pthread_sigmask(): %s\n", strerror(errno)); exit(1); }#ifdef OS_SOLARIS pthread_setconcurrency(THREADS);#endif schedp.sched_priority = schedp.sched_priority - THREADS + 2; pthread_attr_init(&tattr); for (i = 0; i < THREADS - 1; i++) { if (schedp.sched_priority > 0) { if ((pthread_attr_setschedpolicy(&tattr, SCHED)) || (pthread_attr_setschedparam(&tattr, &schedp))) { my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s\n", strerror(errno)); exit(1); } } if (pthread_create(&thid, &tattr, threads[i], 0)) { my_log(LOG_CRIT, "pthread_create(): %s\n", strerror(errno)); exit(1); } pthread_detach(thid); schedp.sched_priority++; } pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0); my_log(LOG_INFO, "Starting %s...", VERSION); my_log(LOG_INFO, "pid: %d", pid); my_log(LOG_INFO, "interface: %s, datalink: %s (%d)", dev, dlt[link_type_idx].descr, link_type, off_nl); my_log(LOG_INFO, "filter: \"%s\"", filter); my_log(LOG_INFO, "options: p=%d s=%u g=%u d=%u e=%u n=%u a=%s x=%u:%u " "b=%u m=%u q=%u r=%u t=%u:%u K=%d k=%d v=%u l=%u", promisc, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime, netflow->Version, inet_ntoa(client.sin_addr), snmp_input_index, snmp_output_index, bulk_quantity, memory_limit >> 10, pending_queue_length, schedp.sched_priority - 1, emit_rate_bytes, emit_rate_delay, off_nl, link_layer, verbosity, log_dest); my_log(LOG_INFO,"collector: %s:%u", inet_ntoa(server.sin_addr), ntohs(server.sin_port)); snmp_input_index = htons(snmp_input_index); snmp_output_index = htons(snmp_output_index); timeout.tv_usec = 0; while (!killed || (total_elements - free_elements - pending_queue_length) || emit_count || pending_tail->flags) { if (!sigs) { timeout.tv_sec = scan_interval; select(0, 0, 0, 0, &timeout); } if (sigs & SIGTERM_MASK && !killed) { sigs &= ~SIGTERM_MASK; my_log(LOG_INFO, "SIGTERM received. Emitting flows cache..."); scan_interval = 1; frag_lifetime = -1; active_lifetime = -1; inactive_lifetime = -1; emit_timeout = 1; unpending_timeout = 1; killed = 1; pthread_cond_signal(&scan_cond); pthread_cond_signal(&unpending_cond); }#if ((DEBUG) & DEBUG_I) if (sigs & SIGUSR1_MASK) { sigs &= ~SIGUSR1_MASK; info_debug(); }#endif }#if ((DEBUG) & DEBUG_I) info_debug();#endif my_log(LOG_INFO, "Done.");#ifdef WALL return 0;#endif}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -