📄 flowdmuxerdmux.py
字号:
def subst_flow(self, toflow, flow): topath = toflow.path fpath = flow.path newpath = fpath subst_frags = toflow.atuple[0] == (ETHERTYPE_IP, 'FRAGS') if flow.file: flow.file.close() self.nopen -= 1 flow.file = None if subst_frags: if toflow.written: newpath = self.timesubst(toflow, flow) os.rename(toflow.path, newpath) os.remove(flow.path) toflow.path = newpath toflow.alts.append(toflow.atuple) for a in flow.alts: self.flowdir[a] = toflow toflow.atuple = flow.atuple self.flowdir[flow.atuple] = toflow self.ttimeo.rem(flow) self.ttimeo.toend(toflow) for alt in flow.alts: if alt != toflow.atuple: toflow.alts.append(alt) self.log('Flow %s subst for %s as %s' % (topath, fpath, newpath)) return toflow ############################################################################ def pre_frags(self, fragflow, flow, ts): if not (flow.written or flow.held): res = self.subst_flow(fragflow, flow) return res else: res = self.frags_merge(fragflow, flow, ts) return res ############################################################################ def frags_merge(self, fragflow, flow, ts): def new_pf(infnm): pf = py_pcap() while 1: try: pf.open_offline(infnm) self.nopen += 1 break except TypeError, s: print 'ex', str(s), if str(s).find('Too many open files') >= 0: self.close_a_flow(force=1) else: if str(s).find('empty') >= 0: sleep(30) continue else: raise #print 'got' return pf def copy(fr, to): pf = new_pf(fr) next = pf.next_tts dump = pf.alt_dump n = 0 while 1: try: sys.stdout.flush() next() sys.stdout.flush() dump(to) sys.stdout.flush() n += 1 sys.stdout.flush() except EOFError: break except TypeError, s: self.exception('frags_merge() copy loop: ' + str(s)) del pf self.nopen -= 1 return n def append_merge(toflow, aflow): if aflow.written: if aflow.file: aflow.file.close() aflow.file = None self.nopen -= 1 if not toflow.file: toflow.file = self.reopen_flow(toflow) toflow.pn += copy(aflow.path, toflow.file) toflow.pn += len(aflow.held) if not self.stdin: if toflow.file: self.dump_held(aflow, toflow.file) else: toflow.held.extend(aflow.held) else: assert not aflow.held self.subst_flow(toflow, aflow) return toflow def full_merge(flow, fragflow): def next(pc): if pc == pc0 or pc == pc1: if pc == pc0: held = held0 else: held = held1 try: off = held.pop(0) except IndexError: return None try: pc.fseek(off) return pc.next_fts() except (TypeError, IOError), s: self.exception('full_merge() pcap offset-read ' + str(s)) else: try: return pc.next_fts() except (TypeError, IOError), s: self.exception('full_merge() pcap read ' + str(s)) except EOFError: return None def pop_first(plist): last = plist[0][0] for p in plist: if p[0] <= last: it = p plist.remove(it) return it pcaps = [] held0 = [off for off in flow.held] held1 = [off for off in fragflow.held] if held0: pc0 = new_pf(self.infile) pcaps.append(pc0) flow.held = [] else: pc0 = None if held1: pc1 = new_pf(self.infile) pcaps.append(pc1) else: pc1 = None if flow.written: if flow.file: flow.file.close() flow.file = None self.nopen -= 1 pct = new_pf(flow.path) pcaps.append(pct) if fragflow.written: if fragflow.file: os.fsync(fragflow.file.fileno()) fragflow.file.close() fragflow.file = None self.nopen -= 1 pca = new_pf(fragflow.path) pcaps.append(pca) newfnm = self.timesubst(flow, fragflow) tmpfnm = os.tmpnam() try: f = self.pcap.alt_dump_open(tmpfnm) self.nopen += 1 except (TypeError, IOError), s: self.exception('full_merge() ' + str(s)) pkts = [] npcaps = len(pcaps) for p in pcaps: stuff = next(p) if stuff: pkts.append((stuff[2], p)) else: npcaps -= 1 pn = 0 while npcaps: ts, p = pop_first(pkts) p.alt_dump(f) pn += 1 stuff = next(p) if stuff: pkts.append((stuff[2], p)) else: #pcaps.remove(p) npcaps -= 1 del p for p in pcaps: del p self.nopen -= 1 os.remove(fragflow.path) if flow.path != newfnm: os.remove(flow.path) f.close() self.nopen -= 1 try: os.rename(tmpfnm, newfnm) except OSError, s: if str(s).find('Invalid cross-device link') >= 0: status, output = getstatusoutput('mv %s %s' % (tmpfnm, newfnm)) if status: raise FlowDmuxerError('full_merge() mv failure (\'%s\')' % (output)) self.log('merged %s and %s as %s' % (flow.path, fragflow.path, newfnm)) flow.path = newfnm self.reopen_flow(flow) flow.written = 1 flow.pn = pn self.flowdir[fragflow.atuple] = flow flow.alts.append(fragflow.atuple) self.ttimeo.rem(fragflow) self.ttimeo.toend(flow) del fragflow return flow ffrag_tm = fragflow.start lfrag_tm = fragflow.last fflow_tm = flow.start lflow_tm = flow.last if lfrag_tm < fflow_tm: return append_merge(fragflow, flow) elif lflow_tm < ffrag_tm: return append_merge(flow, fragflow) else: return full_merge(flow, fragflow) ############################################################################ def new_flow(self, t, ts): flow = FlowOb(t, ts) self.nconns += 1 p = self.make_path(t, ts) while 1: try: flow.file = self.pcap.alt_dump_open(p) flow.written += 1 self.nopen += 1 break except IOError, s: if str(s).find('Too many open files') >= 0: if not self.close_a_flow(): break else: raise flow.path = p self.flowdir[t] = flow self.ttimeo.append(flow) return flow ############################################################################ def get_flow(self, ts, atuple, SYN, tcp): flow = None way = 2 for t in [(atuple[0], atuple[2], atuple[1], atuple[4], atuple[3]), atuple]: try: way -= 1 flow = self.flowdir[t] break except KeyError: pass if flow: if flow.last < ts - self.timeout: self.to += 1 if tcp and not SYN: self.resumed += 1 self.nconns -= 1 else: pass self.end_flow(flow) flow = self.new_flow(atuple, ts) else: self.ttimeo.toend(flow) else: if SYN or self.accept_nosyns: flow = self.new_flow(t, ts) way = CLIENT else: self.nosyn += 1 return flow, way ############################################################################ def close_a_flow(self, force=0): if self.stdin: force = 1 closed = self.timeo(self.tm) if (not closed) and force: for flow in self.ttimeo.items(): if flow.file: flow.file.close() flow.file = None self.nopen -= 1 closed = 1 break if (not closed) and force: for flow in self.flowdir.itervalues(): if flow.file: flow.file.close() flow.file = None self.nopen -= 1 closed = 1 break return closed ############################################################################ def end_flow(self, flow): self.ttimeo.rem(flow) del self.flowdir[flow.atuple] for ref in flow.alts: del self.flowdir[ref] if flow.file: flow.file.close() flow.file = None self.nopen -= 1 elif flow.held: self.pending_dumps.append(flow)############################################################################ def reopen_flow(self, flow): while 1: try: flow.file = open(flow.path, 'a') self.nopen += 1 break except IOError, s: if str(s).find('Too many open files') >= 0: self.close_a_flow(force=1) else: raise if flow.held: self.dump_held(flow, flow.file) return flow.file ############################################################################ def get_eth_atuple(self, pkt): len = min(pkt[1], self.snaplen) if len < ETHER_HDR_LEN: self.truncated += 1 return (None, None, None, None) d = pkt[2] p0 = up('!H', d[ETH_PROT_OFF:ETH_PROT_END])[0] if p0 < ETHERMTU: if len < ETH_LLC_HDR_LEN: self.truncated += 1 return (None, None, None, None) p0 = up('!H', d[SNAP_PROT_OFF:SNAP_PROT_END])[0] start = ETH_LLC_HDR_LEN else: start = ETHER_HDR_LEN if p0 == ETHERTYPE_IP: if len < start + IP_HDR_LEN: self.truncated += 1 return (None, None, None, None) sa = up('i', d[start+IP_SA_OFF:start+IP_SA_END])[0] da = up('i',d[start+IP_DA_OFF:start+IP_DA_END])[0] iphl = (ord(d[start+IP_HL_OFF]) & 0x0f) << 2 ipo = up('!H',d[start+IP_IPOFF_OFF:start+IP_IPOFF_END])[0] ipoff = (ipo & 0x1fff) << 3 if ipoff != 0: id = up('!H',d[start+IP_IPID_OFF:start+IP_IPID_END])[0] iplen = up('!H', d[start+IP_IPLEN_OFF:start+IP_IPLEN_END])[0] return (((ETHERTYPE_IP, 'FRAGS'), sa, id, None, None), 0, None, (da, id, ipoff, iplen-iphl, ipo & IP_MF)) elif ipo & IP_MF: id = up('!H',d[start+IP_IPID_OFF:start+IP_IPID_END])[0] iplen = up('!H', d[start+IP_IPLEN_OFF:start+IP_IPLEN_END])[0] fragt = (da, id, ipoff, iplen-iphl, 1) else: fragt = None p1 = ord(d[start+IP_PROT_OFF]) if p1 == IPPROTO_TCP or p1 == IPPROTO_UDP: prot = (ETHERTYPE_IP, p1) start += iphl if len < start + TCPUDP_HDR_LEN: self.truncated += 1 return (None, None, None, None) sp = up('!H', d[start+TCPUDP_SP_OFF: start+TCPUDP_SP_END])[0] dp = up('!H', d[start+TCPUDP_DP_OFF: start+TCPUDP_DP_END])[0] elif self.protd.has_key((ETHERTYPE_IP, p1)): prot = (ETHERTYPE_IP, p1) sp = dp = None else: prot = (ETHERTYPE_IP, 'OTHER') sp = dp = None return ((prot, sa, da, sp, dp), p1 == IPPROTO_TCP, start, fragt) elif self.protd.has_key((p0, None)): prot = (p0, None) else: prot = ('OTHER', None) dh = d[ETH_DH_OFF:ETH_DH_END] sh = d[ETH_SH_OFF:ETH_SH_END] return ((prot, sh, dh, None, None), 0, 0, None) ############################################################################ def get_err_atuple(self, pkt): len = min(pkt[1], self.snaplen) if len < ETHER_HDR_LEN: self.truncated += 1 return (None, None, None, None) d = pkt[2] p0 = up('!H', d[ETH_PROT_OFF:ETH_PROT_END])[0] start = ETHER_HDR_LEN err = -ord(d[ERRCODE_OFF]) if p0 == ETHERTYPE_IP: if len < start + IP_HDR_LEN: self.truncated += 1 return (None, None, None, None) sa = up('i', d[start+IP_SA_OFF:start+IP_SA_END])[0] iphl = (ord(d[start+IP_HL_OFF]) & 0x0f) << 2 p1 = ord(d[start+IP_PROT_OFF]) if p1 == IPPROTO_TCP or p1 == IPPROTO_UDP:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -