⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 lib.py

📁 Network Administration Visualized 网络管理可视化源码
💻 PY
📖 第 1 页 / 共 2 页
字号:
        self.addRRheader(name, Type.WKS, Class.IN, ttl)        self.addaddr(address)        self.addbyte(chr(protocol))        self.addbytes(bitmap)        self.endRR()    def addSRV(self):        raise NotImplementedErrordef prettyTime(seconds):    if seconds<60:        return seconds,"%d seconds"%(seconds)    if seconds<3600:        return seconds,"%d minutes"%(seconds/60)    if seconds<86400:        return seconds,"%d hours"%(seconds/3600)    if seconds<604800:        return seconds,"%d days"%(seconds/86400)    else:        return seconds,"%d weeks"%(seconds/604800)class RRunpacker(Unpacker):    def __init__(self, buf):        Unpacker.__init__(self, buf)        self.rdend = None    def getRRheader(self):        name = self.getname()        rrtype = self.get16bit()        klass = self.get16bit()        ttl = self.get32bit()        rdlength = self.get16bit()        self.rdend = self.offset + rdlength        return (name, rrtype, klass, ttl, rdlength)    def endRR(self):        if self.offset != self.rdend:            raise UnpackError, 'end of RR not reached'    def getCNAMEdata(self):        return self.getname()    def getHINFOdata(self):        return self.getstring(), self.getstring()    def getMXdata(self):        return self.get16bit(), self.getname()    def getNSdata(self):        return self.getname()    def getPTRdata(self):        return self.getname()    def getSOAdata(self):        return self.getname(), \               self.getname(), \               ('serial',)+(self.get32bit(),), \               ('refresh ',)+prettyTime(self.get32bit()), \               ('retry',)+prettyTime(self.get32bit()), \               ('expire',)+prettyTime(self.get32bit()), \               ('minimum',)+prettyTime(self.get32bit())    def getTXTdata(self):        list = []        while self.offset != self.rdend:            list.append(self.getstring())        return list    def getAdata(self):        return self.getaddr()    def getWKSdata(self):        address = self.getaddr()        protocol = ord(self.getbyte())        bitmap = self.getbytes(self.rdend - self.offset)        return address, protocol, bitmap    def getSRVdata(self):        """        _Service._Proto.Name TTL Class SRV Priority Weight Port Target        """        priority = self.get16bit()        weight = self.get16bit()        port = self.get16bit()        target = self.getname()        #print '***priority, weight, port, target', priority, weight, port, target        return priority, weight, port, target# Pack/unpack Message Header (section 4.1)class Hpacker(Packer):    def addHeader(self, id, qr, opcode, aa, tc, rd, ra, z, rcode,              qdcount, ancount, nscount, arcount):        self.add16bit(id)        self.add16bit((qr&1)<<15 | (opcode&0xF)<<11 | (aa&1)<<10                  | (tc&1)<<9 | (rd&1)<<8 | (ra&1)<<7                  | (z&7)<<4 | (rcode&0xF))        self.add16bit(qdcount)        self.add16bit(ancount)        self.add16bit(nscount)        self.add16bit(arcount)class Hunpacker(Unpacker):    def getHeader(self):        id = self.get16bit()        flags = self.get16bit()        qr, opcode, aa, tc, rd, ra, z, rcode = (                  (flags>>15)&1,                  (flags>>11)&0xF,                  (flags>>10)&1,                  (flags>>9)&1,                  (flags>>8)&1,                  (flags>>7)&1,                  (flags>>4)&7,                  (flags>>0)&0xF)        qdcount = self.get16bit()        ancount = self.get16bit()        nscount = self.get16bit()        arcount = self.get16bit()        return (id, qr, opcode, aa, tc, rd, ra, z, rcode,                  qdcount, ancount, nscount, arcount)# Pack/unpack Question (section 4.1.2)class Qpacker(Packer):    def addQuestion(self, qname, qtype, qclass):        self.addname(qname)        self.add16bit(qtype)        self.add16bit(qclass)class Qunpacker(Unpacker):    def getQuestion(self):        return self.getname(), self.get16bit(), self.get16bit()# Pack/unpack Message(section 4)# NB the order of the base classes is important for __init__()!class Mpacker(RRpacker, Qpacker, Hpacker):    passclass Munpacker(RRunpacker, Qunpacker, Hunpacker):    pass# Routines to print an unpacker to stdout, for debugging.# These affect the unpacker's current position!def dumpM(u):    print 'HEADER:',    (id, qr, opcode, aa, tc, rd, ra, z, rcode,              qdcount, ancount, nscount, arcount) = u.getHeader()    print 'id=%d,' % id,    print 'qr=%d, opcode=%d, aa=%d, tc=%d, rd=%d, ra=%d, z=%d, rcode=%d,' \              % (qr, opcode, aa, tc, rd, ra, z, rcode)    if tc: print '*** response truncated! ***'    if rcode: print '*** nonzero error code! (%d) ***' % rcode    print '  qdcount=%d, ancount=%d, nscount=%d, arcount=%d' \              % (qdcount, ancount, nscount, arcount)    for i in range(qdcount):        print 'QUESTION %d:' % i,        dumpQ(u)    for i in range(ancount):        print 'ANSWER %d:' % i,        dumpRR(u)    for i in range(nscount):        print 'AUTHORITY RECORD %d:' % i,        dumpRR(u)    for i in range(arcount):        print 'ADDITIONAL RECORD %d:' % i,        dumpRR(u)class DnsResult:    def __init__(self,u,args):        self.header={}        self.questions=[]        self.answers=[]        self.authority=[]        self.additional=[]        self.args=args        self.storeM(u)    def show(self):        import time        print '; <<>> PDG.py 1.0 <<>> %s %s'%(self.args['name'],            self.args['qtype'])        opt=""        if self.args['rd']:            opt=opt+'recurs '        h=self.header        print ';; options: '+opt        print ';; got answer:'        print ';; ->>HEADER<<- opcode %s, status %s, id %d'%(            h['opcode'],h['status'],h['id'])        flags=filter(lambda x,h=h:h[x],('qr','aa','rd','ra','tc'))        print ';; flags: %s; Ques: %d, Ans: %d, Auth: %d, Addit: %d'%(            string.join(flags),h['qdcount'],h['ancount'],h['nscount'],            h['arcount'])        print ';; QUESTIONS:'        for q in self.questions:            print ';;      %s, type = %s, class = %s'%(q['qname'],q['qtypestr'],                q['qclassstr'])        print        print ';; ANSWERS:'        for a in self.answers:            print '%-20s    %-6s  %-6s  %s'%(a['name'],`a['ttl']`,a['typename'],                a['data'])        print        print ';; AUTHORITY RECORDS:'        for a in self.authority:            print '%-20s    %-6s  %-6s  %s'%(a['name'],`a['ttl']`,a['typename'],                a['data'])        print        print ';; ADDITIONAL RECORDS:'        for a in self.additional:            print '%-20s    %-6s  %-6s  %s'%(a['name'],`a['ttl']`,a['typename'],                a['data'])        print        if self.args.has_key('elapsed'):            print ';; Total query time: %d msec'%self.args['elapsed']        print ';; To SERVER: %s'%(self.args['server'])        print ';; WHEN: %s'%time.ctime(time.time())    def storeM(self,u):        (self.header['id'], self.header['qr'], self.header['opcode'],          self.header['aa'], self.header['tc'], self.header['rd'],          self.header['ra'], self.header['z'], self.header['rcode'],          self.header['qdcount'], self.header['ancount'],          self.header['nscount'], self.header['arcount']) = u.getHeader()        self.header['opcodestr']=Opcode.opcodestr(self.header['opcode'])        self.header['status']=Status.statusstr(self.header['rcode'])        for i in range(self.header['qdcount']):            #print 'QUESTION %d:' % i,            self.questions.append(self.storeQ(u))        for i in range(self.header['ancount']):            #print 'ANSWER %d:' % i,            self.answers.append(self.storeRR(u))        for i in range(self.header['nscount']):            #print 'AUTHORITY RECORD %d:' % i,            self.authority.append(self.storeRR(u))        for i in range(self.header['arcount']):            #print 'ADDITIONAL RECORD %d:' % i,            self.additional.append(self.storeRR(u))    def storeQ(self,u):        q={}        q['qname'], q['qtype'], q['qclass'] = u.getQuestion()        q['qtypestr']=Type.typestr(q['qtype'])        q['qclassstr']=Class.classstr(q['qclass'])        return q    def storeRR(self,u):        r={}        r['name'],r['type'],r['class'],r['ttl'],r['rdlength'] = u.getRRheader()        r['typename'] = Type.typestr(r['type'])        r['classstr'] = Class.classstr(r['class'])        #print 'name=%s, type=%d(%s), class=%d(%s), ttl=%d' \        #      % (name,        #        type, typename,        #        klass, Class.classstr(class),        #        ttl)        mname = 'get%sdata' % r['typename']        if hasattr(u, mname):            r['data']=getattr(u, mname)()        else:            r['data']=u.getbytes(r['rdlength'])        return rdef dumpQ(u):    qname, qtype, qclass = u.getQuestion()    print 'qname=%s, qtype=%d(%s), qclass=%d(%s)' \              % (qname,                 qtype, Type.typestr(qtype),                 qclass, Class.classstr(qclass))def dumpRR(u):    name, type, klass, ttl, rdlength = u.getRRheader()    typename = Type.typestr(type)    print 'name=%s, type=%d(%s), class=%d(%s), ttl=%d' \              % (name,                 type, typename,                 klass, Class.classstr(klass),                 ttl)    mname = 'get%sdata' % typename    if hasattr(u, mname):        print '  formatted rdata:', getattr(u, mname)()    else:        print '  binary rdata:', u.getbytes(rdlength)if __name__ == "__main__":    testpacker()## $Log: Lib.py,v $# Revision 1.1  2003/03/26 16:03:58  magnun# *** empty log message ***## Revision 1.1  2002/06/17 17:30:19  magnun# *** empty log message ***## Revision 1.11  2002/03/19 13:05:02  anthonybaxter# converted to class based exceptions (there goes the python1.4 compatibility :)## removed a quite gross use of 'eval()'.## Revision 1.10  2002/03/19 12:41:33  anthonybaxter# tabnannied and reindented everything. 4 space indent, no tabs.# yay.## Revision 1.9  2002/03/19 10:30:33  anthonybaxter# first round of major bits and pieces. The major stuff here (summarised# from my local, off-net CVS server :/ this will cause some oddities with# the## tests/testPackers.py:#   a large slab of unit tests for the packer and unpacker code in DNS.Lib## DNS/Lib.py:#   placeholder for addSRV.#   added 'klass' to addA, make it the same as the other A* records.#   made addTXT check for being passed a string, turn it into a length 1 list.#   explicitly check for adding a string of length > 255 (prohibited).#   a bunch of cleanups from a first pass with pychecker#   new code for pack/unpack. the bitwise stuff uses struct, for a smallish#     (disappointly small, actually) improvement, while addr2bin is much#     much faster now.## DNS/Base.py:#   added DiscoverNameServers. This automatically does the right thing#     on unix/ win32. No idea how MacOS handles this.  *sigh*#     Incompatible change: Don't use ParseResolvConf on non-unix, use this#     function, instead!#   a bunch of cleanups from a first pass with pychecker## Revision 1.8  2001/08/09 09:08:55  anthonybaxter# added identifying header to top of each file## Revision 1.7  2001/07/19 07:50:44  anthony# Added SRV (RFC 2782) support. Code from Michael Str鰀er.## Revision 1.6  2001/07/19 07:39:18  anthony# 'type' -> 'rrtype' in getRRheader(). Fix from Michael Str鰀er.## Revision 1.5  2001/07/19 07:34:19  anthony# oops. glitch in storeRR (fixed now).# Reported by Bastian Kleineidam and by greg lin.## Revision 1.4  2001/07/19 07:16:42  anthony# Changed (opcode&0xF)<<11 to (opcode*0xF)<<11.# Patch from Timothy J. Miller.## Revision 1.3  2001/07/19 06:57:07  anthony# cvs keywords added##

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -