⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 lib.py

📁 Network Administration Visualized 网络管理可视化源码
💻 PY
📖 第 1 页 / 共 2 页
字号:
""" $Id: Lib.py,v 1.1 2003/03/26 16:03:58 magnun Exp $ This file is part of the pydns project. Homepage: http://pydns.sourceforge.net This code is covered by the standard Python License. Library code. Largely this is packers and unpackers for various types."""### See RFC 1035:# ------------------------------------------------------------------------# Network Working Group                                     P. Mockapetris# Request for Comments: 1035                                           ISI#                                                            November 1987# Obsoletes: RFCs 882, 883, 973##             DOMAIN NAMES - IMPLEMENTATION AND SPECIFICATION# ------------------------------------------------------------------------import string, typesimport Typeimport Classimport Opcodeimport Statusfrom Base import DNSErrorclass UnpackError(DNSError): passclass PackError(DNSError): pass# Low-level 16 and 32 bit integer packing and unpackingfrom struct import pack as struct_packfrom struct import unpack as struct_unpackdef pack16bit(n):    return struct_pack('!H', n)def pack32bit(n):    return struct_pack('!L', n)def unpack16bit(s):    return struct_unpack('!H', s)[0]def unpack32bit(s):    return struct_unpack('!L', s)[0]def addr2bin(addr):    if type(addr) == type(0): return addr    bytes = addr.split('.')    if len(bytes) != 4: raise ValueError, 'bad IP address'    n = 0    for byte in bytes: n = n<<8 | int(byte)    return ndef bin2addr(n):    return '%d.%d.%d.%d' % ((n>>24)&0xFF, (n>>16)&0xFF,                  (n>>8)&0xFF, n&0xFF)# Packing classclass Packer:    " packer base class. supports basic byte/16bit/32bit/addr/string/name "    def __init__(self):        self.buf = ''        self.index = {}    def getbuf(self):        return self.buf    def addbyte(self, c):        if len(c) != 1: raise TypeError, 'one character expected'        self.buf = self.buf + c    def addbytes(self, bytes):        self.buf = self.buf + bytes    def add16bit(self, n):        self.buf = self.buf + pack16bit(n)    def add32bit(self, n):        self.buf = self.buf + pack32bit(n)    def addaddr(self, addr):        n = addr2bin(addr)        self.buf = self.buf + pack32bit(n)    def addstring(self, s):        if len(s) > 255:            raise ValueError, "Can't encode string of length "+ \                            "%s (> 255)"%(len(s))        self.addbyte(chr(len(s)))        self.addbytes(s)    def addname(self, name):        # Domain name packing (section 4.1.4)        # Add a domain name to the buffer, possibly using pointers.        # The case of the first occurrence of a name is preserved.        # Redundant dots are ignored.        list = []        for label in string.splitfields(name, '.'):            if label:                if len(label) > 63:                    raise PackError, 'label too long'                list.append(label)        keys = []        for i in range(len(list)):            key = string.upper(string.joinfields(list[i:], '.'))            keys.append(key)            if self.index.has_key(key):                pointer = self.index[key]                break        else:            i = len(list)            pointer = None        # Do it into temporaries first so exceptions don't        # mess up self.index and self.buf        buf = ''        offset = len(self.buf)        index = []        for j in range(i):            label = list[j]            n = len(label)            if offset + len(buf) < 0x3FFF:                index.append((keys[j], offset + len(buf)))            else:                print 'DNS.Lib.Packer.addname:',                print 'warning: pointer too big'            buf = buf + (chr(n) + label)        if pointer:            buf = buf + pack16bit(pointer | 0xC000)        else:            buf = buf + '\0'        self.buf = self.buf + buf        for key, value in index:            self.index[key] = value    def dump(self):        keys = self.index.keys()        keys.sort()        print '-'*40        for key in keys:            print '%20s %3d' % (key, self.index[key])        print '-'*40        space = 1        for i in range(0, len(self.buf)+1, 2):            if self.buf[i:i+2] == '**':                if not space: print                space = 1                continue            space = 0            print '%4d' % i,            for c in self.buf[i:i+2]:                if ' ' < c < '\177':                    print ' %c' % c,                else:                    print '%2d' % ord(c),            print        print '-'*40# Unpacking classclass Unpacker:    def __init__(self, buf):        self.buf = buf        self.offset = 0    def getbyte(self):        if self.offset > len(self.buf):            raise UnpackError, "Ran off end of data"        c = self.buf[self.offset]        self.offset = self.offset + 1        return c    def getbytes(self, n):        s = self.buf[self.offset : self.offset + n]        if len(s) != n: raise UnpackError, 'not enough data left'        self.offset = self.offset + n        return s    def get16bit(self):        return unpack16bit(self.getbytes(2))    def get32bit(self):        return unpack32bit(self.getbytes(4))    def getaddr(self):        return bin2addr(self.get32bit())    def getstring(self):        return self.getbytes(ord(self.getbyte()))    def getname(self):        # Domain name unpacking (section 4.1.4)        c = self.getbyte()        i = ord(c)        if i & 0xC0 == 0xC0:            d = self.getbyte()            j = ord(d)            pointer = ((i<<8) | j) & ~0xC000            save_offset = self.offset            try:                self.offset = pointer                domain = self.getname()            finally:                self.offset = save_offset            return domain        if i == 0:            return ''        domain = self.getbytes(i)        remains = self.getname()        if not remains:            return domain        else:            return domain + '.' + remains# Test program for packin/unpacking (section 4.1.4)def testpacker():    N = 2500    R = range(N)    import timing    # See section 4.1.4 of RFC 1035    timing.start()    for i in R:        p = Packer()        p.addaddr('192.168.0.1')        p.addbytes('*' * 20)        p.addname('f.ISI.ARPA')        p.addbytes('*' * 8)        p.addname('Foo.F.isi.arpa')        p.addbytes('*' * 18)        p.addname('arpa')        p.addbytes('*' * 26)        p.addname('')    timing.finish()    print timing.milli(), "ms total for packing"    print round(timing.milli()  / i, 4), 'ms per packing'    #p.dump()    u = Unpacker(p.buf)    u.getaddr()    u.getbytes(20)    u.getname()    u.getbytes(8)    u.getname()    u.getbytes(18)    u.getname()    u.getbytes(26)    u.getname()    timing.start()    for i in R:        u = Unpacker(p.buf)        res = (u.getaddr(),               u.getbytes(20),               u.getname(),               u.getbytes(8),               u.getname(),               u.getbytes(18),               u.getname(),               u.getbytes(26),               u.getname())    timing.finish()    print timing.milli(), "ms total for unpacking"    print round(timing.milli() / i, 4), 'ms per unpacking'    #for item in res: print item# Pack/unpack RR toplevel format (section 3.2.1)class RRpacker(Packer):    def __init__(self):        Packer.__init__(self)        self.rdstart = None    def addRRheader(self, name, type, klass, ttl, *rest):        self.addname(name)        self.add16bit(type)        self.add16bit(klass)        self.add32bit(ttl)        if rest:            if rest[1:]: raise TypeError, 'too many args'            rdlength = rest[0]        else:            rdlength = 0        self.add16bit(rdlength)        self.rdstart = len(self.buf)    def patchrdlength(self):        rdlength = unpack16bit(self.buf[self.rdstart-2:self.rdstart])        if rdlength == len(self.buf) - self.rdstart:            return        rdata = self.buf[self.rdstart:]        save_buf = self.buf        ok = 0        try:            self.buf = self.buf[:self.rdstart-2]            self.add16bit(len(rdata))            self.buf = self.buf + rdata            ok = 1        finally:            if not ok: self.buf = save_buf    def endRR(self):        if self.rdstart is not None:            self.patchrdlength()        self.rdstart = None    def getbuf(self):        if self.rdstart is not None: self.patchrdlength()        return Packer.getbuf(self)    # Standard RRs (section 3.3)    def addCNAME(self, name, klass, ttl, cname):        self.addRRheader(name, Type.CNAME, klass, ttl)        self.addname(cname)        self.endRR()    def addHINFO(self, name, klass, ttl, cpu, os):        self.addRRheader(name, Type.HINFO, klass, ttl)        self.addstring(cpu)        self.addstring(os)        self.endRR()    def addMX(self, name, klass, ttl, preference, exchange):        self.addRRheader(name, Type.MX, klass, ttl)        self.add16bit(preference)        self.addname(exchange)        self.endRR()    def addNS(self, name, klass, ttl, nsdname):        self.addRRheader(name, Type.NS, klass, ttl)        self.addname(nsdname)        self.endRR()    def addPTR(self, name, klass, ttl, ptrdname):        self.addRRheader(name, Type.PTR, klass, ttl)        self.addname(ptrdname)        self.endRR()    def addSOA(self, name, klass, ttl,              mname, rname, serial, refresh, retry, expire, minimum):        self.addRRheader(name, Type.SOA, klass, ttl)        self.addname(mname)        self.addname(rname)        self.add32bit(serial)        self.add32bit(refresh)        self.add32bit(retry)        self.add32bit(expire)        self.add32bit(minimum)        self.endRR()    def addTXT(self, name, klass, ttl, list):        self.addRRheader(name, Type.TXT, klass, ttl)        if type(list) is types.StringType:            list = [list]        for txtdata in list:            self.addstring(txtdata)        self.endRR()    # Internet specific RRs (section 3.4) -- class = IN    def addA(self, name, klass, ttl, address):        self.addRRheader(name, Type.A, klass, ttl)        self.addaddr(address)        self.endRR()    def addWKS(self, name, ttl, address, protocol, bitmap):

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -