📄 lib.py
字号:
self.addRRheader(name, Type.WKS, Class.IN, ttl) self.addaddr(address) self.addbyte(chr(protocol)) self.addbytes(bitmap) self.endRR() def addSRV(self): raise NotImplementedErrordef prettyTime(seconds): if seconds<60: return seconds,"%d seconds"%(seconds) if seconds<3600: return seconds,"%d minutes"%(seconds/60) if seconds<86400: return seconds,"%d hours"%(seconds/3600) if seconds<604800: return seconds,"%d days"%(seconds/86400) else: return seconds,"%d weeks"%(seconds/604800)class RRunpacker(Unpacker): def __init__(self, buf): Unpacker.__init__(self, buf) self.rdend = None def getRRheader(self): name = self.getname() rrtype = self.get16bit() klass = self.get16bit() ttl = self.get32bit() rdlength = self.get16bit() self.rdend = self.offset + rdlength return (name, rrtype, klass, ttl, rdlength) def endRR(self): if self.offset != self.rdend: raise UnpackError, 'end of RR not reached' def getCNAMEdata(self): return self.getname() def getHINFOdata(self): return self.getstring(), self.getstring() def getMXdata(self): return self.get16bit(), self.getname() def getNSdata(self): return self.getname() def getPTRdata(self): return self.getname() def getSOAdata(self): return self.getname(), \ self.getname(), \ ('serial',)+(self.get32bit(),), \ ('refresh ',)+prettyTime(self.get32bit()), \ ('retry',)+prettyTime(self.get32bit()), \ ('expire',)+prettyTime(self.get32bit()), \ ('minimum',)+prettyTime(self.get32bit()) def getTXTdata(self): list = [] while self.offset != self.rdend: list.append(self.getstring()) return list def getAdata(self): return self.getaddr() def getWKSdata(self): address = self.getaddr() protocol = ord(self.getbyte()) bitmap = self.getbytes(self.rdend - self.offset) return address, protocol, bitmap def getSRVdata(self): """ _Service._Proto.Name TTL Class SRV Priority Weight Port Target """ priority = self.get16bit() weight = self.get16bit() port = self.get16bit() target = self.getname() #print '***priority, weight, port, target', priority, weight, port, target return priority, weight, port, target# Pack/unpack Message Header (section 4.1)class Hpacker(Packer): def addHeader(self, id, qr, opcode, aa, tc, rd, ra, z, rcode, qdcount, ancount, nscount, arcount): self.add16bit(id) self.add16bit((qr&1)<<15 | (opcode&0xF)<<11 | (aa&1)<<10 | (tc&1)<<9 | (rd&1)<<8 | (ra&1)<<7 | (z&7)<<4 | (rcode&0xF)) self.add16bit(qdcount) self.add16bit(ancount) self.add16bit(nscount) self.add16bit(arcount)class Hunpacker(Unpacker): def getHeader(self): id = self.get16bit() flags = self.get16bit() qr, opcode, aa, tc, rd, ra, z, rcode = ( (flags>>15)&1, (flags>>11)&0xF, (flags>>10)&1, (flags>>9)&1, (flags>>8)&1, (flags>>7)&1, (flags>>4)&7, (flags>>0)&0xF) qdcount = self.get16bit() ancount = self.get16bit() nscount = self.get16bit() arcount = self.get16bit() return (id, qr, opcode, aa, tc, rd, ra, z, rcode, qdcount, ancount, nscount, arcount)# Pack/unpack Question (section 4.1.2)class Qpacker(Packer): def addQuestion(self, qname, qtype, qclass): self.addname(qname) self.add16bit(qtype) self.add16bit(qclass)class Qunpacker(Unpacker): def getQuestion(self): return self.getname(), self.get16bit(), self.get16bit()# Pack/unpack Message(section 4)# NB the order of the base classes is important for __init__()!class Mpacker(RRpacker, Qpacker, Hpacker): passclass Munpacker(RRunpacker, Qunpacker, Hunpacker): pass# Routines to print an unpacker to stdout, for debugging.# These affect the unpacker's current position!def dumpM(u): print 'HEADER:', (id, qr, opcode, aa, tc, rd, ra, z, rcode, qdcount, ancount, nscount, arcount) = u.getHeader() print 'id=%d,' % id, print 'qr=%d, opcode=%d, aa=%d, tc=%d, rd=%d, ra=%d, z=%d, rcode=%d,' \ % (qr, opcode, aa, tc, rd, ra, z, rcode) if tc: print '*** response truncated! ***' if rcode: print '*** nonzero error code! (%d) ***' % rcode print ' qdcount=%d, ancount=%d, nscount=%d, arcount=%d' \ % (qdcount, ancount, nscount, arcount) for i in range(qdcount): print 'QUESTION %d:' % i, dumpQ(u) for i in range(ancount): print 'ANSWER %d:' % i, dumpRR(u) for i in range(nscount): print 'AUTHORITY RECORD %d:' % i, dumpRR(u) for i in range(arcount): print 'ADDITIONAL RECORD %d:' % i, dumpRR(u)class DnsResult: def __init__(self,u,args): self.header={} self.questions=[] self.answers=[] self.authority=[] self.additional=[] self.args=args self.storeM(u) def show(self): import time print '; <<>> PDG.py 1.0 <<>> %s %s'%(self.args['name'], self.args['qtype']) opt="" if self.args['rd']: opt=opt+'recurs ' h=self.header print ';; options: '+opt print ';; got answer:' print ';; ->>HEADER<<- opcode %s, status %s, id %d'%( h['opcode'],h['status'],h['id']) flags=filter(lambda x,h=h:h[x],('qr','aa','rd','ra','tc')) print ';; flags: %s; Ques: %d, Ans: %d, Auth: %d, Addit: %d'%( string.join(flags),h['qdcount'],h['ancount'],h['nscount'], h['arcount']) print ';; QUESTIONS:' for q in self.questions: print ';; %s, type = %s, class = %s'%(q['qname'],q['qtypestr'], q['qclassstr']) print print ';; ANSWERS:' for a in self.answers: print '%-20s %-6s %-6s %s'%(a['name'],`a['ttl']`,a['typename'], a['data']) print print ';; AUTHORITY RECORDS:' for a in self.authority: print '%-20s %-6s %-6s %s'%(a['name'],`a['ttl']`,a['typename'], a['data']) print print ';; ADDITIONAL RECORDS:' for a in self.additional: print '%-20s %-6s %-6s %s'%(a['name'],`a['ttl']`,a['typename'], a['data']) print if self.args.has_key('elapsed'): print ';; Total query time: %d msec'%self.args['elapsed'] print ';; To SERVER: %s'%(self.args['server']) print ';; WHEN: %s'%time.ctime(time.time()) def storeM(self,u): (self.header['id'], self.header['qr'], self.header['opcode'], self.header['aa'], self.header['tc'], self.header['rd'], self.header['ra'], self.header['z'], self.header['rcode'], self.header['qdcount'], self.header['ancount'], self.header['nscount'], self.header['arcount']) = u.getHeader() self.header['opcodestr']=Opcode.opcodestr(self.header['opcode']) self.header['status']=Status.statusstr(self.header['rcode']) for i in range(self.header['qdcount']): #print 'QUESTION %d:' % i, self.questions.append(self.storeQ(u)) for i in range(self.header['ancount']): #print 'ANSWER %d:' % i, self.answers.append(self.storeRR(u)) for i in range(self.header['nscount']): #print 'AUTHORITY RECORD %d:' % i, self.authority.append(self.storeRR(u)) for i in range(self.header['arcount']): #print 'ADDITIONAL RECORD %d:' % i, self.additional.append(self.storeRR(u)) def storeQ(self,u): q={} q['qname'], q['qtype'], q['qclass'] = u.getQuestion() q['qtypestr']=Type.typestr(q['qtype']) q['qclassstr']=Class.classstr(q['qclass']) return q def storeRR(self,u): r={} r['name'],r['type'],r['class'],r['ttl'],r['rdlength'] = u.getRRheader() r['typename'] = Type.typestr(r['type']) r['classstr'] = Class.classstr(r['class']) #print 'name=%s, type=%d(%s), class=%d(%s), ttl=%d' \ # % (name, # type, typename, # klass, Class.classstr(class), # ttl) mname = 'get%sdata' % r['typename'] if hasattr(u, mname): r['data']=getattr(u, mname)() else: r['data']=u.getbytes(r['rdlength']) return rdef dumpQ(u): qname, qtype, qclass = u.getQuestion() print 'qname=%s, qtype=%d(%s), qclass=%d(%s)' \ % (qname, qtype, Type.typestr(qtype), qclass, Class.classstr(qclass))def dumpRR(u): name, type, klass, ttl, rdlength = u.getRRheader() typename = Type.typestr(type) print 'name=%s, type=%d(%s), class=%d(%s), ttl=%d' \ % (name, type, typename, klass, Class.classstr(klass), ttl) mname = 'get%sdata' % typename if hasattr(u, mname): print ' formatted rdata:', getattr(u, mname)() else: print ' binary rdata:', u.getbytes(rdlength)if __name__ == "__main__": testpacker()## $Log: Lib.py,v $# Revision 1.1 2003/03/26 16:03:58 magnun# *** empty log message ***## Revision 1.1 2002/06/17 17:30:19 magnun# *** empty log message ***## Revision 1.11 2002/03/19 13:05:02 anthonybaxter# converted to class based exceptions (there goes the python1.4 compatibility :)## removed a quite gross use of 'eval()'.## Revision 1.10 2002/03/19 12:41:33 anthonybaxter# tabnannied and reindented everything. 4 space indent, no tabs.# yay.## Revision 1.9 2002/03/19 10:30:33 anthonybaxter# first round of major bits and pieces. The major stuff here (summarised# from my local, off-net CVS server :/ this will cause some oddities with# the## tests/testPackers.py:# a large slab of unit tests for the packer and unpacker code in DNS.Lib## DNS/Lib.py:# placeholder for addSRV.# added 'klass' to addA, make it the same as the other A* records.# made addTXT check for being passed a string, turn it into a length 1 list.# explicitly check for adding a string of length > 255 (prohibited).# a bunch of cleanups from a first pass with pychecker# new code for pack/unpack. the bitwise stuff uses struct, for a smallish# (disappointly small, actually) improvement, while addr2bin is much# much faster now.## DNS/Base.py:# added DiscoverNameServers. This automatically does the right thing# on unix/ win32. No idea how MacOS handles this. *sigh*# Incompatible change: Don't use ParseResolvConf on non-unix, use this# function, instead!# a bunch of cleanups from a first pass with pychecker## Revision 1.8 2001/08/09 09:08:55 anthonybaxter# added identifying header to top of each file## Revision 1.7 2001/07/19 07:50:44 anthony# Added SRV (RFC 2782) support. Code from Michael Str鰀er.## Revision 1.6 2001/07/19 07:39:18 anthony# 'type' -> 'rrtype' in getRRheader(). Fix from Michael Str鰀er.## Revision 1.5 2001/07/19 07:34:19 anthony# oops. glitch in storeRR (fixed now).# Reported by Bastian Kleineidam and by greg lin.## Revision 1.4 2001/07/19 07:16:42 anthony# Changed (opcode&0xF)<<11 to (opcode*0xF)<<11.# Patch from Timothy J. Miller.## Revision 1.3 2001/07/19 06:57:07 anthony# cvs keywords added##
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -