📄 softflowd.c
字号:
((flow->af == AF_INET6 && flow->protocol == IPPROTO_ICMPV6)))) { /* ICMP flows */ flow->expiry->expires_at = flow->flow_last.tv_sec + ft->icmp_timeout; flow->expiry->reason = R_ICMP; goto out; } /* Everything else */ flow->expiry->expires_at = flow->flow_last.tv_sec + ft->general_timeout; flow->expiry->reason = R_GENERAL; out: if (ft->maximum_lifetime != 0 && flow->expiry->expires_at != 0) { flow->expiry->expires_at = MIN(flow->expiry->expires_at, flow->flow_start.tv_sec + ft->maximum_lifetime); } EXPIRY_INSERT(EXPIRIES, &ft->expiries, flow->expiry);}/* Return values from process_packet */#define PP_OK 0#define PP_BAD_PACKET -2#define PP_MALLOC_FAIL -3/* * Main per-packet processing function. Take a packet (provided by * libpcap) and attempt to find a matching flow. If no such flow exists, * then create one. * * Also marks flows for fast expiry, based on flow or packet attributes * (the actual expiry is performed elsewhere) */static intprocess_packet(struct FLOWTRACK *ft, const u_int8_t *pkt, int af, const u_int32_t caplen, const u_int32_t len, const struct timeval *received_time){ struct FLOW tmp, *flow; int frag; ft->total_packets++; /* Convert the IP packet to a flow identity */ memset(&tmp, 0, sizeof(tmp)); switch (af) { case AF_INET: if (ipv4_to_flowrec(&tmp, pkt, caplen, len, &frag, af) == -1) goto bad; break; case AF_INET6: if (ipv6_to_flowrec(&tmp, pkt, caplen, len, &frag, af) == -1) goto bad; break; default: bad: ft->bad_packets++; return (PP_BAD_PACKET); } if (frag) ft->frag_packets++; /* Zero out bits of the flow that aren't relevant to tracking level */ switch (ft->track_level) { case TRACK_IP_ONLY: tmp.protocol = 0; /* FALLTHROUGH */ case TRACK_IP_PROTO: tmp.port[0] = tmp.port[1] = 0; tmp.tcp_flags[0] = tmp.tcp_flags[1] = 0; /* FALLTHROUGH */ case TRACK_FULL: break; } /* If a matching flow does not exist, create and insert one */ if ((flow = FLOW_FIND(FLOWS, &ft->flows, &tmp)) == NULL) { /* Allocate and fill in the flow */ if ((flow = malloc(sizeof(*flow))) == NULL) { logit(LOG_ERR, "process_packet: flow malloc(%u) fail", sizeof(*flow)); return (PP_MALLOC_FAIL); } memcpy(flow, &tmp, sizeof(*flow)); memcpy(&flow->flow_start, received_time, sizeof(flow->flow_start)); flow->flow_seq = ft->next_flow_seq++; FLOW_INSERT(FLOWS, &ft->flows, flow); /* Allocate and fill in the associated expiry event */ if ((flow->expiry = malloc(sizeof(*flow->expiry))) == NULL) { logit(LOG_ERR, "process_packet: expiry malloc(%u) fail", sizeof(*flow->expiry)); return (PP_MALLOC_FAIL); } flow->expiry->flow = flow; /* Must be non-zero (0 means expire immediately) */ flow->expiry->expires_at = 1; flow->expiry->reason = R_GENERAL; EXPIRY_INSERT(EXPIRIES, &ft->expiries, flow->expiry); ft->num_flows++; if (verbose_flag) logit(LOG_DEBUG, "ADD FLOW %s", format_flow_brief(flow)); } else { /* Update flow statistics */ flow->packets[0] += tmp.packets[0]; flow->octets[0] += tmp.octets[0]; flow->tcp_flags[0] |= tmp.tcp_flags[0]; flow->packets[1] += tmp.packets[1]; flow->octets[1] += tmp.octets[1]; flow->tcp_flags[1] |= tmp.tcp_flags[1]; } memcpy(&flow->flow_last, received_time, sizeof(flow->flow_last)); if (flow->expiry->expires_at != 0) flow_update_expiry(ft, flow); return (PP_OK);}/* * Subtract two timevals. Returns (t1 - t2) in milliseconds. */u_int32_ttimeval_sub_ms(const struct timeval *t1, const struct timeval *t2){ struct timeval res; res.tv_sec = t1->tv_sec - t2->tv_sec; res.tv_usec = t1->tv_usec - t2->tv_usec; if (res.tv_usec < 0) { res.tv_usec += 1000000L; res.tv_sec--; } return ((u_int32_t)res.tv_sec * 1000 + (u_int32_t)res.tv_usec / 1000);}static voidupdate_statistic(struct STATISTIC *s, double new, double n){ if (n == 1.0) { s->min = s->mean = s->max = new; return; } s->min = MIN(s->min, new); s->max = MAX(s->max, new); s->mean = s->mean + ((new - s->mean) / n);}/* Update global statistics */static voidupdate_statistics(struct FLOWTRACK *ft, struct FLOW *flow){ double tmp; static double n = 1.0; ft->flows_expired++; ft->flows_pp[flow->protocol % 256]++; tmp = (double)flow->flow_last.tv_sec + ((double)flow->flow_last.tv_usec / 1000000.0); tmp -= (double)flow->flow_start.tv_sec + ((double)flow->flow_start.tv_usec / 1000000.0); if (tmp < 0.0) tmp = 0.0; update_statistic(&ft->duration, tmp, n); update_statistic(&ft->duration_pp[flow->protocol], tmp, (double)ft->flows_pp[flow->protocol % 256]); tmp = flow->octets[0] + flow->octets[1]; update_statistic(&ft->octets, tmp, n); ft->octets_pp[flow->protocol % 256] += tmp; tmp = flow->packets[0] + flow->packets[1]; update_statistic(&ft->packets, tmp, n); ft->packets_pp[flow->protocol % 256] += tmp; n++;}static void update_expiry_stats(struct FLOWTRACK *ft, struct EXPIRY *e){ switch (e->reason) { case R_GENERAL: ft->expired_general++; break; case R_TCP: ft->expired_tcp++; break; case R_TCP_RST: ft->expired_tcp_rst++; break; case R_TCP_FIN: ft->expired_tcp_fin++; break; case R_UDP: ft->expired_udp++; break; case R_ICMP: ft->expired_icmp++; break; case R_MAXLIFE: ft->expired_maxlife++; break; case R_OVERBYTES: ft->expired_overbytes++; break; case R_OVERFLOWS: ft->expired_maxflows++; break; case R_FLUSH: ft->expired_flush++; break; } }/* How long before the next expiry event in millisecond */static intnext_expire(struct FLOWTRACK *ft){ struct EXPIRY *expiry; struct timeval now; u_int32_t expires_at, ret, fudge; gettimeofday(&now, NULL); if ((expiry = EXPIRY_MIN(EXPIRIES, &ft->expiries)) == NULL) return (-1); /* indefinite */ expires_at = expiry->expires_at; /* Don't cluster urgent expiries */ if (expires_at == 0 && (expiry->reason == R_OVERBYTES || expiry->reason == R_OVERFLOWS || expiry->reason == R_FLUSH)) return (0); /* Now */ /* Cluster expiries by expiry_interval */ if (ft->expiry_interval > 1) { if ((fudge = expires_at % ft->expiry_interval) > 0) expires_at += ft->expiry_interval - fudge; } if (expires_at < now.tv_sec) return (0); /* Now */ ret = 999 + (expires_at - now.tv_sec) * 1000; return (ret);}/* * Scan the tree of expiry events and process expired flows. If zap_all * is set, then forcibly expire all flows. */#define CE_EXPIRE_NORMAL 0 /* Normal expiry processing */#define CE_EXPIRE_ALL -1 /* Expire all flows immediately */#define CE_EXPIRE_FORCED 1 /* Only expire force-expired flows */static intcheck_expired(struct FLOWTRACK *ft, struct NETFLOW_TARGET *target, int ex){ struct FLOW **expired_flows, **oldexp; int num_expired, i, r; struct timeval now; struct EXPIRY *expiry, *nexpiry; gettimeofday(&now, NULL); r = 0; num_expired = 0; expired_flows = NULL; if (verbose_flag) logit(LOG_DEBUG, "Starting expiry scan: mode %d", ex); for(expiry = EXPIRY_MIN(EXPIRIES, &ft->expiries); expiry != NULL; expiry = nexpiry) { nexpiry = EXPIRY_NEXT(EXPIRIES, &ft->expiries, expiry); if ((expiry->expires_at == 0) || (ex == CE_EXPIRE_ALL) || (ex != CE_EXPIRE_FORCED && (expiry->expires_at < now.tv_sec))) { /* Flow has expired */ if (ft->maximum_lifetime != 0 && expiry->flow->flow_last.tv_sec - expiry->flow->flow_start.tv_sec >= ft->maximum_lifetime) expiry->reason = R_MAXLIFE; if (verbose_flag) logit(LOG_DEBUG, "Queuing flow seq:%llu (%p) for expiry " "reason %d", expiry->flow->flow_seq, expiry->flow, expiry->reason); /* Add to array of expired flows */ oldexp = expired_flows; expired_flows = realloc(expired_flows, sizeof(*expired_flows) * (num_expired + 1)); /* Don't fatal on realloc failures */ if (expired_flows == NULL) expired_flows = oldexp; else { expired_flows[num_expired] = expiry->flow; num_expired++; } if (ex == CE_EXPIRE_ALL) expiry->reason = R_FLUSH; update_expiry_stats(ft, expiry); /* Remove from flow tree, destroy expiry event */ FLOW_REMOVE(FLOWS, &ft->flows, expiry->flow); EXPIRY_REMOVE(EXPIRIES, &ft->expiries, expiry); expiry->flow->expiry = NULL; free(expiry); ft->num_flows--; } } if (verbose_flag) logit(LOG_DEBUG, "Finished scan %d flow(s) to be evicted", num_expired); /* Processing for expired flows */ if (num_expired > 0) { if (target != NULL && target->fd != -1) { r = target->dialect->func(expired_flows, num_expired, target->fd, &ft->flows_exported, &ft->system_boot_time, verbose_flag); if (verbose_flag) logit(LOG_DEBUG, "sent %d netflow packets", r); if (r > 0) { ft->packets_sent += r; /* XXX what if r < num_expired * 2 ? */ } else { ft->flows_dropped += num_expired * 2; } } for (i = 0; i < num_expired; i++) { if (verbose_flag) { logit(LOG_DEBUG, "EXPIRED: %s (%p)", format_flow(expired_flows[i]), expired_flows[i]); } update_statistics(ft, expired_flows[i]); free(expired_flows[i]); } free(expired_flows); } return (r == -1 ? -1 : num_expired);}/* * Force expiry of num_to_expire flows (e.g. when flow table overfull) */static voidforce_expire(struct FLOWTRACK *ft, u_int32_t num_to_expire){ struct EXPIRY *expiry, **expiryv; int i; /* XXX move all overflow processing here (maybe) */ if (verbose_flag) logit(LOG_INFO, "Forcing expiry of %d flows", num_to_expire); /* * Do this in two steps, as it is dangerous to change a key on * a tree entry without first removing it and then re-adding it. * It is even worse when this has to be done during a FOREACH :) * To get around this, we make a list of expired flows and _then_ * alter them */ if ((expiryv = calloc(num_to_expire, sizeof(*expiryv))) == NULL) { /* * On malloc failure, expire ALL flows. I assume that * setting all the keys in a tree to the same value is * safe. */ logit(LOG_ERR, "Out of memory while expiring flows - " "all flows expired"); EXPIRY_FOREACH(expiry, EXPIRIES, &ft->expiries) { expiry->expires_at = 0; expiry->reason = R_OVERFLOWS; ft->flows_force_expired++; } return; } /* Make the list of flows to expire */ i = 0; EXPIRY_FOREACH(expiry, EXPIRIES, &ft->expiries) { if (i >= num_to_expire) break; expiryv[i++] = expiry; } if (i < num_to_expire) { logit(LOG_ERR, "Needed to expire %d flows, " "but only %d active", num_to_expire, i); num_to_expire = i; } for(i = 0; i < num_to_expire; i++) { EXPIRY_REMOVE(EXPIRIES, &ft->expiries, expiryv[i]); expiryv[i]->expires_at = 0; expiryv[i]->reason = R_OVERFLOWS; EXPIRY_INSERT(EXPIRIES, &ft->expiries, expiryv[i]); } ft->flows_force_expired += num_to_expire; free(expiryv); /* XXX - this is overcomplicated, perhaps use a separate queue */}/* Delete all flows that we know about without processing */static intdelete_all_flows(struct FLOWTRACK *ft){ struct FLOW *flow, *nflow; int i; i = 0; for(flow = FLOW_MIN(FLOWS, &ft->flows); flow != NULL; flow = nflow) { nflow = FLOW_NEXT(FLOWS, &ft->flows, flow); FLOW_REMOVE(FLOWS, &ft->flows, flow); EXPIRY_REMOVE(EXPIRIES, &ft->expiries, flow->expiry); free(flow->expiry); ft->num_flows--; free(flow); i++; } return (i);}/* * Log our current status. * Includes summary counters and (in verbose mode) the list of current flows * and the tree of expiry events. */static intstatistics(struct FLOWTRACK *ft, FILE *out, pcap_t *pcap){ int i; struct protoent *pe; char proto[32]; struct pcap_stat ps; fprintf(out, "Number of active flows: %d\n", ft->num_flows); fprintf(out, "Packets processed: %llu\n", ft->total_packets); fprintf(out, "Fragments: %llu\n", ft->frag_packets); fprintf(out, "Ignored packets: %llu (%llu non-IP, %llu too short)\n", ft->non_ip_packets + ft->bad_packets, ft->non_ip_packets, ft->bad_packets); fprintf(out, "Flows expired: %llu (%llu forced)\n", ft->flows_expired, ft->flows_force_expired); fprintf(out, "Flows exported: %llu in %llu packets (%llu failures)\n", ft->flows_exported, ft->packets_sent, ft->flows_dropped); if (pcap_stats(pcap, &ps) == 0) { fprintf(out, "Packets received by libpcap: %lu\n", (unsigned long)ps.ps_recv); fprintf(out, "Packets dropped by libpcap: %lu\n", (unsigned long)ps.ps_drop); fprintf(out, "Packets dropped by interface: %lu\n", (unsigned long)ps.ps_ifdrop); } fprintf(out, "\n");
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -