zeroconf.py

来自「Amarok是一款在LINUX或其他类UNIX操作系统中运行的音频播放器软件。 」· Python 代码 · 共 1,579 行 · 第 1/4 页

PY
1,579
字号
		"""Tests equality on alias"""		if isinstance(other, DNSPointer):			return self.alias == other.alias		return 0	def __repr__(self):		"""String representation"""		return self.toString(self.alias)class DNSText(DNSRecord):	"""A DNS text record"""		def __init__(self, name, type, clazz, ttl, text):		DNSRecord.__init__(self, name, type, clazz, ttl)		self.text = text	def write(self, out):		"""Used in constructing an outgoing packet"""		out.writeString(self.text, len(self.text))	def __eq__(self, other):		"""Tests equality on text"""		if isinstance(other, DNSText):			return self.text == other.text		return 0	def __repr__(self):		"""String representation"""		if len(self.text) > 10:			return self.toString(self.text[:7] + "...")		else:			return self.toString(self.text)class DNSService(DNSRecord):	"""A DNS service record"""		def __init__(self, name, type, clazz, ttl, priority, weight, port, server):		DNSRecord.__init__(self, name, type, clazz, ttl)		self.priority = priority		self.weight = weight		self.port = port		self.server = server	def write(self, out):		"""Used in constructing an outgoing packet"""		out.writeShort(self.priority)		out.writeShort(self.weight)		out.writeShort(self.port)		out.writeName(self.server)	def __eq__(self, other):		"""Tests equality on priority, weight, port and server"""		if isinstance(other, DNSService):			return self.priority == other.priority and self.weight == other.weight and self.port == other.port and self.server == other.server		return 0	def __repr__(self):		"""String representation"""		return self.toString("%s:%s" % (self.server, self.port))class DNSIncoming(object):	"""Object representation of an incoming DNS packet"""		def __init__(self, data):		"""Constructor from string holding bytes of packet"""		self.offset = 0		self.data = data		self.questions = []		self.answers = []		self.numQuestions = 0		self.numAnswers = 0		self.numAuthorities = 0		self.numAdditionals = 0				self.readHeader()		self.readQuestions()		self.readOthers()	def readHeader(self):		"""Reads header portion of packet"""		format = '!HHHHHH'		length = struct.calcsize(format)		info = struct.unpack(format, self.data[self.offset:self.offset+length])		self.offset += length		self.id = info[0]		self.flags = info[1]		self.numQuestions = info[2]		self.numAnswers = info[3]		self.numAuthorities = info[4]		self.numAdditionals = info[5]	def readQuestions(self):		"""Reads questions section of packet"""		format = '!HH'		length = struct.calcsize(format)		for i in range(0, self.numQuestions):			name = self.readName()			info = struct.unpack(format, self.data[self.offset:self.offset+length])			self.offset += length						question = DNSQuestion(name, info[0], info[1])			self.questions.append(question)	def readInt(self):		"""Reads an integer from the packet"""		format = '!I'		length = struct.calcsize(format)		info = struct.unpack(format, self.data[self.offset:self.offset+length])		self.offset += length		return info[0]	def readCharacterString(self):		"""Reads a character string from the packet"""		length = ord(self.data[self.offset])		self.offset += 1		return self.readString(length)	def readString(self, len):		"""Reads a string of a given length from the packet"""		format = '!' + str(len) + 's'		length =  struct.calcsize(format)		info = struct.unpack(format, self.data[self.offset:self.offset+length])		self.offset += length		return info[0]	def readUnsignedShort(self):		"""Reads an unsigned short from the packet"""		format = '!H'		length = struct.calcsize(format)		info = struct.unpack(format, self.data[self.offset:self.offset+length])		self.offset += length		return info[0]	def readOthers(self):		"""Reads the answers, authorities and additionals section of the packet"""		format = '!HHiH'		length = struct.calcsize(format)		n = self.numAnswers + self.numAuthorities + self.numAdditionals		for i in range(0, n):			domain = self.readName()			info = struct.unpack(format, self.data[self.offset:self.offset+length])			self.offset += length			rec = None			if info[0] == _TYPE_A:				rec = DNSAddress(domain, info[0], info[1], info[2], self.readString(4))			elif info[0] == _TYPE_CNAME or info[0] == _TYPE_PTR:				rec = DNSPointer(domain, info[0], info[1], info[2], self.readName())			elif info[0] == _TYPE_TXT:				rec = DNSText(domain, info[0], info[1], info[2], self.readString(info[3]))			elif info[0] == _TYPE_SRV:				rec = DNSService(domain, info[0], info[1], info[2], self.readUnsignedShort(), self.readUnsignedShort(), self.readUnsignedShort(), self.readName())			elif info[0] == _TYPE_HINFO:				rec = DNSHinfo(domain, info[0], info[1], info[2], self.readCharacterString(), self.readCharacterString())			elif info[0] == _TYPE_AAAA:				rec = DNSAddress(domain, info[0], info[1], info[2], self.readString(16))			else:				# Try to ignore types we don't know about				# this may mean the rest of the name is				# unable to be parsed, and may show errors				# so this is left for debugging.  New types				# encountered need to be parsed properly.				#				#print "UNKNOWN TYPE = " + str(info[0])				#raise BadTypeInNameException				pass			if rec is not None:				self.answers.append(rec)					def isQuery(self):		"""Returns true if this is a query"""		return (self.flags & _FLAGS_QR_MASK) == _FLAGS_QR_QUERY	def isResponse(self):		"""Returns true if this is a response"""		return (self.flags & _FLAGS_QR_MASK) == _FLAGS_QR_RESPONSE	def readUTF(self, offset, len):		"""Reads a UTF-8 string of a given length from the packet"""		result = self.data[offset:offset+len].decode('utf-8')		return result			def readName(self):		"""Reads a domain name from the packet"""		result = ''		off = self.offset		next = -1		first = off		while 1:			len = ord(self.data[off])			off += 1			if len == 0:				break			t = len & 0xC0			if t == 0x00:				result = ''.join((result, self.readUTF(off, len) + '.'))				off += len			elif t == 0xC0:				if next < 0:					next = off + 1				off = ((len & 0x3F) << 8) | ord(self.data[off])				if off >= first:					raise "Bad domain name (circular) at " + str(off)				first = off			else:				raise "Bad domain name at " + str(off)		if next >= 0:			self.offset = next		else:			self.offset = off		return result			class DNSOutgoing(object):	"""Object representation of an outgoing packet"""		def __init__(self, flags, multicast = 1):		self.finished = 0		self.id = 0		self.multicast = multicast		self.flags = flags		self.names = {}		self.data = []		self.size = 12				self.questions = []		self.answers = []		self.authorities = []		self.additionals = []	def addQuestion(self, record):		"""Adds a question"""		self.questions.append(record)	def addAnswer(self, inp, record):		"""Adds an answer"""		if not record.suppressedBy(inp):			self.addAnswerAtTime(record, 0)	def addAnswerAtTime(self, record, now):		"""Adds an answer if if does not expire by a certain time"""		if record is not None:			if now == 0 or not record.isExpired(now):				self.answers.append((record, now))	def addAuthorativeAnswer(self, record):		"""Adds an authoritative answer"""		self.authorities.append(record)	def addAdditionalAnswer(self, record):		"""Adds an additional answer"""		self.additionals.append(record)	def writeByte(self, value):		"""Writes a single byte to the packet"""		format = '!c'		self.data.append(struct.pack(format, chr(value)))		self.size += 1	def insertShort(self, index, value):		"""Inserts an unsigned short in a certain position in the packet"""		format = '!H'		self.data.insert(index, struct.pack(format, value))		self.size += 2			def writeShort(self, value):		"""Writes an unsigned short to the packet"""		format = '!H'		self.data.append(struct.pack(format, value))		self.size += 2	def writeInt(self, value):		"""Writes an unsigned integer to the packet"""		format = '!I'		self.data.append(struct.pack(format, value))		self.size += 4	def writeString(self, value, length):		"""Writes a string to the packet"""		format = '!' + str(length) + 's'		self.data.append(struct.pack(format, value))		self.size += length	def writeUTF(self, s):		"""Writes a UTF-8 string of a given length to the packet"""		utfstr = s.encode('utf-8')		length = len(utfstr)		if length > 64:			raise NamePartTooLongException		self.writeByte(length)		self.writeString(utfstr, length)	def writeName(self, name):		"""Writes a domain name to the packet"""		try:			# Find existing instance of this name in packet			#			index = self.names[name]		except KeyError:			# No record of this name already, so write it			# out as normal, recording the location of the name			# for future pointers to it.			#			self.names[name] = self.size			parts = name.split('.')			if parts[-1] == '':				parts = parts[:-1]			for part in parts:				self.writeUTF(part)			self.writeByte(0)			return		# An index was found, so write a pointer to it		#		self.writeByte((index >> 8) | 0xC0)		self.writeByte(index)	def writeQuestion(self, question):		"""Writes a question to the packet"""		self.writeName(question.name)		self.writeShort(question.type)		self.writeShort(question.clazz)	def writeRecord(self, record, now):		"""Writes a record (answer, authoritative answer, additional) to		the packet"""		self.writeName(record.name)		self.writeShort(record.type)		if record.unique and self.multicast:			self.writeShort(record.clazz | _CLASS_UNIQUE)		else:			self.writeShort(record.clazz)		if now == 0:			self.writeInt(record.ttl)		else:			self.writeInt(record.getRemainingTTL(now))		index = len(self.data)		# Adjust size for the short we will write before this record		#		self.size += 2		record.write(self)		self.size -= 2				length = len(''.join(self.data[index:]))		self.insertShort(index, length) # Here is the short we adjusted for	def packet(self):		"""Returns a string containing the packet's bytes		No further parts should be added to the packet once this		is done."""		if not self.finished:			self.finished = 1			for question in self.questions:				self.writeQuestion(question)			for answer, time in self.answers:				self.writeRecord(answer, time)			for authority in self.authorities:				self.writeRecord(authority, 0)			for additional in self.additionals:				self.writeRecord(additional, 0)					self.insertShort(0, len(self.additionals))			self.insertShort(0, len(self.authorities))			self.insertShort(0, len(self.answers))			self.insertShort(0, len(self.questions))			self.insertShort(0, self.flags)			if self.multicast:				self.insertShort(0, 0)			else:				self.insertShort(0, self.id)		return ''.join(self.data)class DNSCache(object):	"""A cache of DNS entries"""		def __init__(self):		self.cache = {}	def add(self, entry):		"""Adds an entry"""		try:			list = self.cache[entry.key]		except:			list = self.cache[entry.key] = []		list.append(entry)	def remove(self, entry):

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?