📄 zeroconf.py
字号:
if isinstance(other, DNSPointer): return self.alias == other.alias return 0 def __repr__(self): """String representation""" return self.toString(self.alias)class DNSText(DNSRecord): """A DNS text record""" def __init__(self, name, type, clazz, ttl, text): DNSRecord.__init__(self, name, type, clazz, ttl) self.text = text def write(self, out): """Used in constructing an outgoing packet""" out.writeString(self.text, len(self.text)) def __eq__(self, other): """Tests equality on text""" if isinstance(other, DNSText): return self.text == other.text return 0 def __repr__(self): """String representation""" if len(self.text) > 10: return self.toString(self.text[:7] + "...") else: return self.toString(self.text)class DNSService(DNSRecord): """A DNS service record""" def __init__(self, name, type, clazz, ttl, priority, weight, port, server): DNSRecord.__init__(self, name, type, clazz, ttl) self.priority = priority self.weight = weight self.port = port self.server = server def write(self, out): """Used in constructing an outgoing packet""" out.writeShort(self.priority) out.writeShort(self.weight) out.writeShort(self.port) out.writeName(self.server) def __eq__(self, other): """Tests equality on priority, weight, port and server""" if isinstance(other, DNSService): return self.priority == other.priority and self.weight == other.weight and self.port == other.port and self.server == other.server return 0 def __repr__(self): """String representation""" return self.toString("%s:%s" % (self.server, self.port))class DNSIncoming(object): """Object representation of an incoming DNS packet""" def __init__(self, data): """Constructor from string holding bytes of packet""" self.offset = 0 self.data = data self.questions = [] self.answers = [] self.numQuestions = 0 self.numAnswers = 0 self.numAuthorities = 0 self.numAdditionals = 0 self.readHeader() self.readQuestions() self.readOthers() def readHeader(self): """Reads header portion of packet""" format = '!HHHHHH' length = struct.calcsize(format) info = struct.unpack(format, self.data[self.offset:self.offset+length]) self.offset += length self.id = info[0] self.flags = info[1] self.numQuestions = info[2] self.numAnswers = info[3] self.numAuthorities = info[4] self.numAdditionals = info[5] def readQuestions(self): """Reads questions section of packet""" format = '!HH' length = struct.calcsize(format) for i in xrange(self.numQuestions): name = self.readName() info = struct.unpack(format, self.data[self.offset:self.offset+length]) self.offset += length try: question = DNSQuestion(name, info[0], info[1]) self.questions.append(question) except NonLocalNameException: # invalid question ? pass def readInt(self): """Reads an integer from the packet""" format = '!I' length = struct.calcsize(format) info = struct.unpack(format, self.data[self.offset:self.offset+length]) self.offset += length return info[0] def readCharacterString(self): """Reads a character string from the packet""" length = ord(self.data[self.offset]) self.offset += 1 return self.readString(length) def readString(self, len): """Reads a string of a given length from the packet""" format = '!' + str(len) + 's' length = struct.calcsize(format) info = struct.unpack(format, self.data[self.offset:self.offset+length]) self.offset += length return info[0] def readUnsignedShort(self): """Reads an unsigned short from the packet""" format = '!H' length = struct.calcsize(format) info = struct.unpack(format, self.data[self.offset:self.offset+length]) self.offset += length return info[0] def readOthers(self): """Reads the answers, authorities and additionals section of the packet""" format = '!HHiH' length = struct.calcsize(format) n = self.numAnswers + self.numAuthorities + self.numAdditionals for i in xrange(n): domain = self.readName() info = struct.unpack(format, self.data[self.offset:self.offset+length]) self.offset += length rec = None if info[0] == _TYPE_A: rec = DNSAddress(domain, info[0], info[1], info[2], self.readString(4)) elif info[0] == _TYPE_CNAME or info[0] == _TYPE_PTR: rec = DNSPointer(domain, info[0], info[1], info[2], self.readName()) elif info[0] == _TYPE_TXT: rec = DNSText(domain, info[0], info[1], info[2], self.readString(info[3])) elif info[0] == _TYPE_SRV: rec = DNSService(domain, info[0], info[1], info[2], self.readUnsignedShort(), self.readUnsignedShort(), self.readUnsignedShort(), self.readName()) elif info[0] == _TYPE_HINFO: rec = DNSHinfo(domain, info[0], info[1], info[2], self.readCharacterString(), self.readCharacterString()) elif info[0] == _TYPE_AAAA: rec = DNSAddress(domain, info[0], info[1], info[2], self.readString(16)) else: # Try to ignore types we don't know about # this may mean the rest of the name is # unable to be parsed, and may show errors # so this is left for debugging. New types # encountered need to be parsed properly. # #print "UNKNOWN TYPE = " + str(info[0]) #raise BadTypeInNameException pass if rec is not None: self.answers.append(rec) def isQuery(self): """Returns true if this is a query""" return (self.flags & _FLAGS_QR_MASK) == _FLAGS_QR_QUERY def isResponse(self): """Returns true if this is a response""" return (self.flags & _FLAGS_QR_MASK) == _FLAGS_QR_RESPONSE def readUTF(self, offset, len): """Reads a UTF-8 string of a given length from the packet""" result = self.data[offset:offset+len].decode('utf-8') return result def readName(self): """Reads a domain name from the packet""" result = '' off = self.offset next = -1 first = off while 1: len = ord(self.data[off]) off += 1 if len == 0: break t = len & 0xC0 if t == 0x00: result = ''.join((result, self.readUTF(off, len) + '.')) off += len elif t == 0xC0: if next < 0: next = off + 1 off = ((len & 0x3F) << 8) | ord(self.data[off]) if off >= first: raise "Bad domain name (circular) at " + str(off) first = off else: raise "Bad domain name at " + str(off) if next >= 0: self.offset = next else: self.offset = off return result class DNSOutgoing(object): """Object representation of an outgoing packet""" def __init__(self, flags, multicast = 1): self.finished = 0 self.id = 0 self.multicast = multicast self.flags = flags self.names = {} self.data = [] self.size = 12 self.questions = [] self.answers = [] self.authorities = [] self.additionals = [] def addQuestion(self, record): """Adds a question""" self.questions.append(record) def addAnswer(self, inp, record): """Adds an answer""" if not record.suppressedBy(inp): self.addAnswerAtTime(record, 0) def addAnswerAtTime(self, record, now): """Adds an answer if if does not expire by a certain time""" if record is not None: if now == 0 or not record.isExpired(now): self.answers.append((record, now)) def addAuthorativeAnswer(self, record): """Adds an authoritative answer""" self.authorities.append(record) def addAdditionalAnswer(self, record): """Adds an additional answer""" self.additionals.append(record) def writeByte(self, value): """Writes a single byte to the packet""" format = '!c' self.data.append(struct.pack(format, chr(value))) self.size += 1 def insertShort(self, index, value): """Inserts an unsigned short in a certain position in the packet""" format = '!H' self.data.insert(index, struct.pack(format, value)) self.size += 2 def writeShort(self, value): """Writes an unsigned short to the packet""" format = '!H' self.data.append(struct.pack(format, value)) self.size += 2 def writeInt(self, value): """Writes an unsigned integer to the packet""" format = '!I' self.data.append(struct.pack(format, value)) self.size += 4 def writeString(self, value, length): """Writes a string to the packet""" format = '!' + str(length) + 's' self.data.append(struct.pack(format, value)) self.size += length def writeUTF(self, s): """Writes a UTF-8 string of a given length to the packet""" utfstr = s.encode('utf-8') length = len(utfstr) if length > 64: raise NamePartTooLongException self.writeByte(length) self.writeString(utfstr, length) def writeName(self, name): """Writes a domain name to the packet""" try: # Find existing instance of this name in packet # index = self.names[name] except KeyError: # No record of this name already, so write it # out as normal, recording the location of the name # for future pointers to it. # self.names[name] = self.size parts = name.split('.') if parts[-1] == '': parts = parts[:-1] for part in parts: self.writeUTF(part) self.writeByte(0) return # An index was found, so write a pointer to it # self.writeByte((index >> 8) | 0xC0) self.writeByte(index) def writeQuestion(self, question): """Writes a question to the packet""" self.writeName(question.name) self.writeShort(question.type) self.writeShort(question.clazz) def writeRecord(self, record, now): """Writes a record (answer, authoritative answer, additional) to the packet""" self.writeName(record.name) self.writeShort(record.type) if record.unique and self.multicast: self.writeShort(record.clazz | _CLASS_UNIQUE) else: self.writeShort(record.clazz) if now == 0: self.writeInt(record.ttl) else: self.writeInt(record.getRemainingTTL(now)) index = len(self.data) # Adjust size for the short we will write before this record # self.size += 2 record.write(self) self.size -= 2 length = len(''.join(self.data[index:])) self.insertShort(index, length) # Here is the short we adjusted for def packet(self): """Returns a string containing the packet's bytes No further parts should be added to the packet once this is done.""" if not self.finished: self.finished = 1 for question in self.questions: self.writeQuestion(question) for answer, time in self.answers: self.writeRecord(answer, time) for authority in self.authorities: self.writeRecord(authority, 0) for additional in self.additionals: self.writeRecord(additional, 0) self.insertShort(0, len(self.additionals)) self.insertShort(0, len(self.authorities)) self.insertShort(0, len(self.answers)) self.insertShort(0, len(self.questions)) self.insertShort(0, self.flags) if self.multicast: self.insertShort(0, 0) else: self.insertShort(0, self.id) return ''.join(self.data)class DNSCache(object): """A cache of DNS entries""" def __init__(self): self.cache = {} def add(self, entry): """Adds an entry""" try: list = self.cache[entry.key] except:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -