📄 lib.py
字号:
""" $Id: Lib.py,v 1.1 2003/03/26 16:03:58 magnun Exp $ This file is part of the pydns project. Homepage: http://pydns.sourceforge.net This code is covered by the standard Python License. Library code. Largely this is packers and unpackers for various types."""### See RFC 1035:# ------------------------------------------------------------------------# Network Working Group P. Mockapetris# Request for Comments: 1035 ISI# November 1987# Obsoletes: RFCs 882, 883, 973## DOMAIN NAMES - IMPLEMENTATION AND SPECIFICATION# ------------------------------------------------------------------------import string, typesimport Typeimport Classimport Opcodeimport Statusfrom Base import DNSErrorclass UnpackError(DNSError): passclass PackError(DNSError): pass# Low-level 16 and 32 bit integer packing and unpackingfrom struct import pack as struct_packfrom struct import unpack as struct_unpackdef pack16bit(n): return struct_pack('!H', n)def pack32bit(n): return struct_pack('!L', n)def unpack16bit(s): return struct_unpack('!H', s)[0]def unpack32bit(s): return struct_unpack('!L', s)[0]def addr2bin(addr): if type(addr) == type(0): return addr bytes = addr.split('.') if len(bytes) != 4: raise ValueError, 'bad IP address' n = 0 for byte in bytes: n = n<<8 | int(byte) return ndef bin2addr(n): return '%d.%d.%d.%d' % ((n>>24)&0xFF, (n>>16)&0xFF, (n>>8)&0xFF, n&0xFF)# Packing classclass Packer: " packer base class. supports basic byte/16bit/32bit/addr/string/name " def __init__(self): self.buf = '' self.index = {} def getbuf(self): return self.buf def addbyte(self, c): if len(c) != 1: raise TypeError, 'one character expected' self.buf = self.buf + c def addbytes(self, bytes): self.buf = self.buf + bytes def add16bit(self, n): self.buf = self.buf + pack16bit(n) def add32bit(self, n): self.buf = self.buf + pack32bit(n) def addaddr(self, addr): n = addr2bin(addr) self.buf = self.buf + pack32bit(n) def addstring(self, s): if len(s) > 255: raise ValueError, "Can't encode string of length "+ \ "%s (> 255)"%(len(s)) self.addbyte(chr(len(s))) self.addbytes(s) def addname(self, name): # Domain name packing (section 4.1.4) # Add a domain name to the buffer, possibly using pointers. # The case of the first occurrence of a name is preserved. # Redundant dots are ignored. list = [] for label in string.splitfields(name, '.'): if label: if len(label) > 63: raise PackError, 'label too long' list.append(label) keys = [] for i in range(len(list)): key = string.upper(string.joinfields(list[i:], '.')) keys.append(key) if self.index.has_key(key): pointer = self.index[key] break else: i = len(list) pointer = None # Do it into temporaries first so exceptions don't # mess up self.index and self.buf buf = '' offset = len(self.buf) index = [] for j in range(i): label = list[j] n = len(label) if offset + len(buf) < 0x3FFF: index.append((keys[j], offset + len(buf))) else: print 'DNS.Lib.Packer.addname:', print 'warning: pointer too big' buf = buf + (chr(n) + label) if pointer: buf = buf + pack16bit(pointer | 0xC000) else: buf = buf + '\0' self.buf = self.buf + buf for key, value in index: self.index[key] = value def dump(self): keys = self.index.keys() keys.sort() print '-'*40 for key in keys: print '%20s %3d' % (key, self.index[key]) print '-'*40 space = 1 for i in range(0, len(self.buf)+1, 2): if self.buf[i:i+2] == '**': if not space: print space = 1 continue space = 0 print '%4d' % i, for c in self.buf[i:i+2]: if ' ' < c < '\177': print ' %c' % c, else: print '%2d' % ord(c), print print '-'*40# Unpacking classclass Unpacker: def __init__(self, buf): self.buf = buf self.offset = 0 def getbyte(self): if self.offset > len(self.buf): raise UnpackError, "Ran off end of data" c = self.buf[self.offset] self.offset = self.offset + 1 return c def getbytes(self, n): s = self.buf[self.offset : self.offset + n] if len(s) != n: raise UnpackError, 'not enough data left' self.offset = self.offset + n return s def get16bit(self): return unpack16bit(self.getbytes(2)) def get32bit(self): return unpack32bit(self.getbytes(4)) def getaddr(self): return bin2addr(self.get32bit()) def getstring(self): return self.getbytes(ord(self.getbyte())) def getname(self): # Domain name unpacking (section 4.1.4) c = self.getbyte() i = ord(c) if i & 0xC0 == 0xC0: d = self.getbyte() j = ord(d) pointer = ((i<<8) | j) & ~0xC000 save_offset = self.offset try: self.offset = pointer domain = self.getname() finally: self.offset = save_offset return domain if i == 0: return '' domain = self.getbytes(i) remains = self.getname() if not remains: return domain else: return domain + '.' + remains# Test program for packin/unpacking (section 4.1.4)def testpacker(): N = 2500 R = range(N) import timing # See section 4.1.4 of RFC 1035 timing.start() for i in R: p = Packer() p.addaddr('192.168.0.1') p.addbytes('*' * 20) p.addname('f.ISI.ARPA') p.addbytes('*' * 8) p.addname('Foo.F.isi.arpa') p.addbytes('*' * 18) p.addname('arpa') p.addbytes('*' * 26) p.addname('') timing.finish() print timing.milli(), "ms total for packing" print round(timing.milli() / i, 4), 'ms per packing' #p.dump() u = Unpacker(p.buf) u.getaddr() u.getbytes(20) u.getname() u.getbytes(8) u.getname() u.getbytes(18) u.getname() u.getbytes(26) u.getname() timing.start() for i in R: u = Unpacker(p.buf) res = (u.getaddr(), u.getbytes(20), u.getname(), u.getbytes(8), u.getname(), u.getbytes(18), u.getname(), u.getbytes(26), u.getname()) timing.finish() print timing.milli(), "ms total for unpacking" print round(timing.milli() / i, 4), 'ms per unpacking' #for item in res: print item# Pack/unpack RR toplevel format (section 3.2.1)class RRpacker(Packer): def __init__(self): Packer.__init__(self) self.rdstart = None def addRRheader(self, name, type, klass, ttl, *rest): self.addname(name) self.add16bit(type) self.add16bit(klass) self.add32bit(ttl) if rest: if rest[1:]: raise TypeError, 'too many args' rdlength = rest[0] else: rdlength = 0 self.add16bit(rdlength) self.rdstart = len(self.buf) def patchrdlength(self): rdlength = unpack16bit(self.buf[self.rdstart-2:self.rdstart]) if rdlength == len(self.buf) - self.rdstart: return rdata = self.buf[self.rdstart:] save_buf = self.buf ok = 0 try: self.buf = self.buf[:self.rdstart-2] self.add16bit(len(rdata)) self.buf = self.buf + rdata ok = 1 finally: if not ok: self.buf = save_buf def endRR(self): if self.rdstart is not None: self.patchrdlength() self.rdstart = None def getbuf(self): if self.rdstart is not None: self.patchrdlength() return Packer.getbuf(self) # Standard RRs (section 3.3) def addCNAME(self, name, klass, ttl, cname): self.addRRheader(name, Type.CNAME, klass, ttl) self.addname(cname) self.endRR() def addHINFO(self, name, klass, ttl, cpu, os): self.addRRheader(name, Type.HINFO, klass, ttl) self.addstring(cpu) self.addstring(os) self.endRR() def addMX(self, name, klass, ttl, preference, exchange): self.addRRheader(name, Type.MX, klass, ttl) self.add16bit(preference) self.addname(exchange) self.endRR() def addNS(self, name, klass, ttl, nsdname): self.addRRheader(name, Type.NS, klass, ttl) self.addname(nsdname) self.endRR() def addPTR(self, name, klass, ttl, ptrdname): self.addRRheader(name, Type.PTR, klass, ttl) self.addname(ptrdname) self.endRR() def addSOA(self, name, klass, ttl, mname, rname, serial, refresh, retry, expire, minimum): self.addRRheader(name, Type.SOA, klass, ttl) self.addname(mname) self.addname(rname) self.add32bit(serial) self.add32bit(refresh) self.add32bit(retry) self.add32bit(expire) self.add32bit(minimum) self.endRR() def addTXT(self, name, klass, ttl, list): self.addRRheader(name, Type.TXT, klass, ttl) if type(list) is types.StringType: list = [list] for txtdata in list: self.addstring(txtdata) self.endRR() # Internet specific RRs (section 3.4) -- class = IN def addA(self, name, klass, ttl, address): self.addRRheader(name, Type.A, klass, ttl) self.addaddr(address) self.endRR() def addWKS(self, name, ttl, address, protocol, bitmap):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -