⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 zeroconf.py

📁 bittorrent source by python. please enjoy
💻 PY
📖 第 1 页 / 共 4 页
字号:
        if isinstance(other, DNSPointer):            return self.alias == other.alias        return 0    def __repr__(self):        """String representation"""        return self.toString(self.alias)class DNSText(DNSRecord):    """A DNS text record"""        def __init__(self, name, type, clazz, ttl, text):        DNSRecord.__init__(self, name, type, clazz, ttl)        self.text = text    def write(self, out):        """Used in constructing an outgoing packet"""        out.writeString(self.text, len(self.text))    def __eq__(self, other):        """Tests equality on text"""        if isinstance(other, DNSText):            return self.text == other.text        return 0    def __repr__(self):        """String representation"""        if len(self.text) > 10:            return self.toString(self.text[:7] + "...")        else:            return self.toString(self.text)class DNSService(DNSRecord):    """A DNS service record"""        def __init__(self, name, type, clazz, ttl, priority, weight, port, server):        DNSRecord.__init__(self, name, type, clazz, ttl)        self.priority = priority        self.weight = weight        self.port = port        self.server = server    def write(self, out):        """Used in constructing an outgoing packet"""        out.writeShort(self.priority)        out.writeShort(self.weight)        out.writeShort(self.port)        out.writeName(self.server)    def __eq__(self, other):        """Tests equality on priority, weight, port and server"""        if isinstance(other, DNSService):            return self.priority == other.priority and self.weight == other.weight and self.port == other.port and self.server == other.server        return 0    def __repr__(self):        """String representation"""        return self.toString("%s:%s" % (self.server, self.port))class DNSIncoming(object):    """Object representation of an incoming DNS packet"""        def __init__(self, data):        """Constructor from string holding bytes of packet"""        self.offset = 0        self.data = data        self.questions = []        self.answers = []        self.numQuestions = 0        self.numAnswers = 0        self.numAuthorities = 0        self.numAdditionals = 0                self.readHeader()        self.readQuestions()        self.readOthers()    def readHeader(self):        """Reads header portion of packet"""        format = '!HHHHHH'        length = struct.calcsize(format)        info = struct.unpack(format, self.data[self.offset:self.offset+length])        self.offset += length        self.id = info[0]        self.flags = info[1]        self.numQuestions = info[2]        self.numAnswers = info[3]        self.numAuthorities = info[4]        self.numAdditionals = info[5]    def readQuestions(self):        """Reads questions section of packet"""        format = '!HH'        length = struct.calcsize(format)        for i in xrange(self.numQuestions):            name = self.readName()            info = struct.unpack(format, self.data[self.offset:self.offset+length])            self.offset += length            try:                            question = DNSQuestion(name, info[0], info[1])                self.questions.append(question)            except NonLocalNameException:                # invalid question ?                pass    def readInt(self):        """Reads an integer from the packet"""        format = '!I'        length = struct.calcsize(format)        info = struct.unpack(format, self.data[self.offset:self.offset+length])        self.offset += length        return info[0]    def readCharacterString(self):        """Reads a character string from the packet"""        length = ord(self.data[self.offset])        self.offset += 1        return self.readString(length)    def readString(self, len):        """Reads a string of a given length from the packet"""        format = '!' + str(len) + 's'        length =  struct.calcsize(format)        info = struct.unpack(format, self.data[self.offset:self.offset+length])        self.offset += length        return info[0]    def readUnsignedShort(self):        """Reads an unsigned short from the packet"""        format = '!H'        length = struct.calcsize(format)        info = struct.unpack(format, self.data[self.offset:self.offset+length])        self.offset += length        return info[0]    def readOthers(self):        """Reads the answers, authorities and additionals section of the packet"""        format = '!HHiH'        length = struct.calcsize(format)        n = self.numAnswers + self.numAuthorities + self.numAdditionals        for i in xrange(n):            domain = self.readName()            info = struct.unpack(format, self.data[self.offset:self.offset+length])            self.offset += length            rec = None            if info[0] == _TYPE_A:                rec = DNSAddress(domain, info[0], info[1], info[2], self.readString(4))            elif info[0] == _TYPE_CNAME or info[0] == _TYPE_PTR:                rec = DNSPointer(domain, info[0], info[1], info[2], self.readName())            elif info[0] == _TYPE_TXT:                rec = DNSText(domain, info[0], info[1], info[2], self.readString(info[3]))            elif info[0] == _TYPE_SRV:                rec = DNSService(domain, info[0], info[1], info[2], self.readUnsignedShort(), self.readUnsignedShort(), self.readUnsignedShort(), self.readName())            elif info[0] == _TYPE_HINFO:                rec = DNSHinfo(domain, info[0], info[1], info[2], self.readCharacterString(), self.readCharacterString())            elif info[0] == _TYPE_AAAA:                rec = DNSAddress(domain, info[0], info[1], info[2], self.readString(16))            else:                # Try to ignore types we don't know about                # this may mean the rest of the name is                # unable to be parsed, and may show errors                # so this is left for debugging.  New types                # encountered need to be parsed properly.                #                #print "UNKNOWN TYPE = " + str(info[0])                #raise BadTypeInNameException                pass            if rec is not None:                self.answers.append(rec)                    def isQuery(self):        """Returns true if this is a query"""        return (self.flags & _FLAGS_QR_MASK) == _FLAGS_QR_QUERY    def isResponse(self):        """Returns true if this is a response"""        return (self.flags & _FLAGS_QR_MASK) == _FLAGS_QR_RESPONSE    def readUTF(self, offset, len):        """Reads a UTF-8 string of a given length from the packet"""        result = self.data[offset:offset+len].decode('utf-8')        return result            def readName(self):        """Reads a domain name from the packet"""        result = ''        off = self.offset        next = -1        first = off        while 1:            len = ord(self.data[off])            off += 1            if len == 0:                break            t = len & 0xC0            if t == 0x00:                result = ''.join((result, self.readUTF(off, len) + '.'))                off += len            elif t == 0xC0:                if next < 0:                    next = off + 1                off = ((len & 0x3F) << 8) | ord(self.data[off])                if off >= first:                    raise "Bad domain name (circular) at " + str(off)                first = off            else:                raise "Bad domain name at " + str(off)        if next >= 0:            self.offset = next        else:            self.offset = off        return result            class DNSOutgoing(object):    """Object representation of an outgoing packet"""        def __init__(self, flags, multicast = 1):        self.finished = 0        self.id = 0        self.multicast = multicast        self.flags = flags        self.names = {}        self.data = []        self.size = 12                self.questions = []        self.answers = []        self.authorities = []        self.additionals = []    def addQuestion(self, record):        """Adds a question"""        self.questions.append(record)    def addAnswer(self, inp, record):        """Adds an answer"""        if not record.suppressedBy(inp):            self.addAnswerAtTime(record, 0)    def addAnswerAtTime(self, record, now):        """Adds an answer if if does not expire by a certain time"""        if record is not None:            if now == 0 or not record.isExpired(now):                self.answers.append((record, now))    def addAuthorativeAnswer(self, record):        """Adds an authoritative answer"""        self.authorities.append(record)    def addAdditionalAnswer(self, record):        """Adds an additional answer"""        self.additionals.append(record)    def writeByte(self, value):        """Writes a single byte to the packet"""        format = '!c'        self.data.append(struct.pack(format, chr(value)))        self.size += 1    def insertShort(self, index, value):        """Inserts an unsigned short in a certain position in the packet"""        format = '!H'        self.data.insert(index, struct.pack(format, value))        self.size += 2            def writeShort(self, value):        """Writes an unsigned short to the packet"""        format = '!H'        self.data.append(struct.pack(format, value))        self.size += 2    def writeInt(self, value):        """Writes an unsigned integer to the packet"""        format = '!I'        self.data.append(struct.pack(format, value))        self.size += 4    def writeString(self, value, length):        """Writes a string to the packet"""        format = '!' + str(length) + 's'        self.data.append(struct.pack(format, value))        self.size += length    def writeUTF(self, s):        """Writes a UTF-8 string of a given length to the packet"""        utfstr = s.encode('utf-8')        length = len(utfstr)        if length > 64:            raise NamePartTooLongException        self.writeByte(length)        self.writeString(utfstr, length)    def writeName(self, name):        """Writes a domain name to the packet"""        try:            # Find existing instance of this name in packet            #            index = self.names[name]        except KeyError:            # No record of this name already, so write it            # out as normal, recording the location of the name            # for future pointers to it.            #            self.names[name] = self.size            parts = name.split('.')            if parts[-1] == '':                parts = parts[:-1]            for part in parts:                self.writeUTF(part)            self.writeByte(0)            return        # An index was found, so write a pointer to it        #        self.writeByte((index >> 8) | 0xC0)        self.writeByte(index)    def writeQuestion(self, question):        """Writes a question to the packet"""        self.writeName(question.name)        self.writeShort(question.type)        self.writeShort(question.clazz)    def writeRecord(self, record, now):        """Writes a record (answer, authoritative answer, additional) to        the packet"""        self.writeName(record.name)        self.writeShort(record.type)        if record.unique and self.multicast:            self.writeShort(record.clazz | _CLASS_UNIQUE)        else:            self.writeShort(record.clazz)        if now == 0:            self.writeInt(record.ttl)        else:            self.writeInt(record.getRemainingTTL(now))        index = len(self.data)        # Adjust size for the short we will write before this record        #        self.size += 2        record.write(self)        self.size -= 2                length = len(''.join(self.data[index:]))        self.insertShort(index, length) # Here is the short we adjusted for    def packet(self):        """Returns a string containing the packet's bytes        No further parts should be added to the packet once this        is done."""        if not self.finished:            self.finished = 1            for question in self.questions:                self.writeQuestion(question)            for answer, time in self.answers:                self.writeRecord(answer, time)            for authority in self.authorities:                self.writeRecord(authority, 0)            for additional in self.additionals:                self.writeRecord(additional, 0)                    self.insertShort(0, len(self.additionals))            self.insertShort(0, len(self.authorities))            self.insertShort(0, len(self.answers))            self.insertShort(0, len(self.questions))            self.insertShort(0, self.flags)            if self.multicast:                self.insertShort(0, 0)            else:                self.insertShort(0, self.id)        return ''.join(self.data)class DNSCache(object):    """A cache of DNS entries"""        def __init__(self):        self.cache = {}    def add(self, entry):        """Adds an entry"""        try:            list = self.cache[entry.key]        except:

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -