zeroconf.py

来自「Amarok是一款在LINUX或其他类UNIX操作系统中运行的音频播放器软件。 」· Python 代码 · 共 1,579 行 · 第 1/4 页

PY
1,579
字号
					return 0				if next <= now:					out = DNSOutgoing(_FLAGS_QR_QUERY)					out.addQuestion(DNSQuestion(self.name, _TYPE_SRV, _CLASS_IN))					out.addAnswerAtTime(zeroconf.cache.getByDetails(self.name, _TYPE_SRV, _CLASS_IN), now)					out.addQuestion(DNSQuestion(self.name, _TYPE_TXT, _CLASS_IN))					out.addAnswerAtTime(zeroconf.cache.getByDetails(self.name, _TYPE_TXT, _CLASS_IN), now)					if self.server is not None:						out.addQuestion(DNSQuestion(self.server, _TYPE_A, _CLASS_IN))						out.addAnswerAtTime(zeroconf.cache.getByDetails(self.server, _TYPE_A, _CLASS_IN), now)					zeroconf.send(out)					next = now + delay					delay = delay * 2				zeroconf.wait(min(next, last) - now)				now = currentTimeMillis()			result = 1		finally:			zeroconf.removeListener(self)					return result	def __eq__(self, other):		"""Tests equality of service name"""		if isinstance(other, ServiceInfo):			return other.name == self.name		return 0	def __ne__(self, other):		"""Non-equality test"""		return not self.__eq__(other)	def __repr__(self):		"""String representation"""		result = "service[%s,%s:%s," % (self.name, socket.inet_ntoa(self.getAddress()), self.port)		if self.text is None:			result += "None"		else:			if len(self.text) < 20:				result += self.text			else:				result += self.text[:17] + "..."		result += "]"		return result				class Zeroconf(object):	"""Implementation of Zeroconf Multicast DNS Service Discovery	Supports registration, unregistration, queries and browsing.	"""	def __init__(self, bindaddress=None):		"""Creates an instance of the Zeroconf class, establishing		multicast communications, listening and reaping threads."""		globals()['_GLOBAL_DONE'] = 0		if bindaddress is None:			self.intf = socket.gethostbyname(socket.gethostname())		else:			self.intf = bindaddress		self.group = ('', _MDNS_PORT)		self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)		try:			self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)			self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)		except:			# SO_REUSEADDR should be equivalent to SO_REUSEPORT for			# multicast UDP sockets (p 731, "TCP/IP Illustrated,			# Volume 2"), but some BSD-derived systems require			# SO_REUSEPORT to be specified explicity.  Also, not all			# versions of Python have SO_REUSEPORT available.  So			# if you're on a BSD-based system, and haven't upgraded			# to Python 2.3 yet, you may find this library doesn't			# work as expected.			#			pass		self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 255)		self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1)		try:			self.socket.bind(self.group)		except:			# Some versions of linux raise an exception even though			# the SO_REUSE* options have been set, so ignore it			#			pass		self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(self.intf) + socket.inet_aton('0.0.0.0'))		self.socket.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(_MDNS_ADDR) + socket.inet_aton('0.0.0.0'))		self.listeners = []		self.browsers = []		self.services = {}		self.servicetypes = {}		self.cache = DNSCache()		self.condition = threading.Condition()				self.engine = Engine(self)		self.listener = Listener(self)		self.reaper = Reaper(self)			def __del__(self):		self.close()	def isLoopback(self):		return self.intf.startswith("127.0.0.1")	def isLinklocal(self):		return self.intf.startswith("169.254.")	def wait(self, timeout):		"""Calling thread waits for a given number of milliseconds or		until notified."""		self.condition.acquire()		self.condition.wait(timeout/1000)		self.condition.release()	def notifyAll(self):		"""Notifies all waiting threads"""		self.condition.acquire()		self.condition.notifyAll()		self.condition.release()	def getServiceInfo(self, type, name, timeout=3000):		"""Returns network's service information for a particular		name and type, or None if no service matches by the timeout,		which defaults to 3 seconds."""		info = ServiceInfo(type, name)		if info.request(self, timeout):			return info		return None	def addServiceListener(self, type, listener):		"""Adds a listener for a particular service type.  This object		will then have its updateRecord method called when information		arrives for that type."""		self.removeServiceListener(listener)		self.browsers.append(ServiceBrowser(self, type, listener))	def removeServiceListener(self, listener):		"""Removes a listener from the set that is currently listening."""		for browser in self.browsers:			if browser.listener == listener:				browser.cancel()				del(browser)	def registerService(self, info, ttl=_DNS_TTL):		"""Registers service information to the network with a default TTL		of 60 seconds.  Zeroconf will then respond to requests for		information for that service.  The name of the service may be		changed if needed to make it unique on the network."""		self.checkService(info)		self.services[info.name.lower()] = info		if self.servicetypes.has_key(info.type):			self.servicetypes[info.type]+=1		else:			self.servicetypes[info.type]=1		now = currentTimeMillis()		nextTime = now		i = 0		while i < 3:			if now < nextTime:				self.wait(nextTime - now)				now = currentTimeMillis()				continue			out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA)			out.addAnswerAtTime(DNSPointer(info.type, _TYPE_PTR, _CLASS_IN, ttl, info.name), 0)			out.addAnswerAtTime(DNSService(info.name, _TYPE_SRV, _CLASS_IN, ttl, info.priority, info.weight, info.port, info.server), 0)			out.addAnswerAtTime(DNSText(info.name, _TYPE_TXT, _CLASS_IN, ttl, info.text), 0)			if info.address:				out.addAnswerAtTime(DNSAddress(info.server, _TYPE_A, _CLASS_IN, ttl, info.address), 0)			self.send(out)			i += 1			nextTime += _REGISTER_TIME	def unregisterService(self, info):		"""Unregister a service."""		try:			del(self.services[info.name.lower()])			if self.servicetypes[info.type]>1:				self.servicetypes[info.type]-=1			else:				del self.servicetypes[info.type]		except:			pass		now = currentTimeMillis()		nextTime = now		i = 0		while i < 3:			if now < nextTime:				self.wait(nextTime - now)				now = currentTimeMillis()				continue			out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA)			out.addAnswerAtTime(DNSPointer(info.type, _TYPE_PTR, _CLASS_IN, 0, info.name), 0)			out.addAnswerAtTime(DNSService(info.name, _TYPE_SRV, _CLASS_IN, 0, info.priority, info.weight, info.port, info.name), 0)			out.addAnswerAtTime(DNSText(info.name, _TYPE_TXT, _CLASS_IN, 0, info.text), 0)			if info.address:				out.addAnswerAtTime(DNSAddress(info.server, _TYPE_A, _CLASS_IN, 0, info.address), 0)			self.send(out)			i += 1			nextTime += _UNREGISTER_TIME	def unregisterAllServices(self):		"""Unregister all registered services."""		print 'Unregistering ',len(self.services),' services'		if len(self.services) > 0:			now = currentTimeMillis()			nextTime = now			i = 0			while i < 3:				if now < nextTime:					self.wait(nextTime - now)					now = currentTimeMillis()					continue				out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA)				for info in self.services.values():					out.addAnswerAtTime(DNSPointer(info.type, _TYPE_PTR, _CLASS_IN, 0, info.name), 0)					out.addAnswerAtTime(DNSService(info.name, _TYPE_SRV, _CLASS_IN, 0, info.priority, info.weight, info.port, info.server), 0)					out.addAnswerAtTime(DNSText(info.name, _TYPE_TXT, _CLASS_IN, 0, info.text), 0)					if info.address:						out.addAnswerAtTime(DNSAddress(info.server, _TYPE_A, _CLASS_IN, 0, info.address), 0)				self.send(out)				i += 1				nextTime += _UNREGISTER_TIME	def checkService(self, info):		"""Checks the network for a unique service name, modifying the		ServiceInfo passed in if it is not unique."""		now = currentTimeMillis()		nextTime = now		i = 0		while i < 3:			for record in self.cache.entriesWithName(info.type):				if record.type == _TYPE_PTR and not record.isExpired(now) and record.alias == info.name:					if (info.name.find('.') < 0):						info.name = info.name + ".[" + info.address + ":" + info.port + "]." + info.type						self.checkService(info)						return					raise NonUniqueNameException			if now < nextTime:				self.wait(nextTime - now)				now = currentTimeMillis()				continue			out = DNSOutgoing(_FLAGS_QR_QUERY | _FLAGS_AA)			self.debug = out			out.addQuestion(DNSQuestion(info.type, _TYPE_PTR, _CLASS_IN))			out.addAuthorativeAnswer(DNSPointer(info.type, _TYPE_PTR, _CLASS_IN, _DNS_TTL, info.name))			self.send(out)			i += 1			nextTime += _CHECK_TIME	def addListener(self, listener, question):		"""Adds a listener for a given question.  The listener will have		its updateRecord method called when information is available to		answer the question."""		now = currentTimeMillis()		self.listeners.append(listener)		if question is not None:			for record in self.cache.entriesWithName(question.name):				if question.answeredBy(record) and not record.isExpired(now):					listener.updateRecord(self, now, record)		self.notifyAll()	def removeListener(self, listener):		"""Removes a listener."""		try:			self.listeners.remove(listener)			self.notifyAll()		except:			pass	def updateRecord(self, now, rec):		"""Used to notify listeners of new information that has updated		a record."""		for listener in self.listeners:			listener.updateRecord(self, now, rec)		self.notifyAll()	def handleResponse(self, msg):		"""Deal with incoming response packets.  All answers		are held in the cache, and listeners are notified."""		now = currentTimeMillis()		for record in msg.answers:			expired = record.isExpired(now)			if record in self.cache.entries():				if expired:					self.cache.remove(record)				else:					entry = self.cache.get(record)					if entry is not None:						entry.resetTTL(record)						record = entry			else:				self.cache.add(record)							self.updateRecord(now, record)	def handleQuery(self, msg, addr, port):		"""Deal with incoming query packets.  Provides a response if		possible."""		out = None		# Support unicast client responses		#		if port != _MDNS_PORT:			out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA, 0)			for question in msg.questions:				out.addQuestion(question)				for question in msg.questions:			if question.type == _TYPE_PTR:				if question.name == "_services._dns-sd._udp.local.":					for stype in self.servicetypes.keys():						if out is None:							out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA)						out.addAnswer(msg, DNSPointer("_services._dns-sd._udp.local.", _TYPE_PTR, _CLASS_IN, _DNS_TTL, stype))										for service in self.services.values():					if question.name == service.type:						if out is None:							out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA)						out.addAnswer(msg, DNSPointer(service.type, _TYPE_PTR, _CLASS_IN, _DNS_TTL, service.name))			else:				try:					if out is None:						out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA)										# Answer A record queries for any service addresses we know					if question.type == _TYPE_A or question.type == _TYPE_ANY:						for service in self.services.values():							if service.server == question.name.lower():								out.addAnswer(msg, DNSAddress(question.name, _TYPE_A, _CLASS_IN | _CLASS_UNIQUE, _DNS_TTL, service.address))										service = self.services.get(question.name.lower(), None)					if not service: continue										if question.type == _TYPE_SRV or question.type == _TYPE_ANY:						out.addAnswer(msg, DNSService(question.name, _TYPE_SRV, _CLASS_IN | _CLASS_UNIQUE, _DNS_TTL, service.priority, service.weight, service.port, service.server))					if question.type == _TYPE_TXT or question.type == _TYPE_ANY:						out.addAnswer(msg, DNSText(question.name, _TYPE_TXT, _CLASS_IN | _CLASS_UNIQUE, _DNS_TTL, service.text))					if question.type == _TYPE_SRV:						out.addAdditionalAnswer(DNSAddress(service.server, _TYPE_A, _CLASS_IN | _CLASS_UNIQUE, _DNS_TTL, service.address))				except:					traceback.print_exc()						if out is not None and out.answers:			out.id = msg.id			self.send(out, addr, port)	def send(self, out, addr = _MDNS_ADDR, port = _MDNS_PORT):		"""Sends an outgoing packet."""		# This is a quick test to see if we can parse the packets we generate		#temp = DNSIncoming(out.packet())		try:			bytes_sent = self.socket.sendto(out.packet(), 0, (addr, port))		except:			# Ignore this, it may be a temporary loss of network connection			pass	def close(self):		"""Ends the background threads, and prevent this instance from		servicing further queries."""		print 'in close'		if globals()['_GLOBAL_DONE'] == 0:			globals()['_GLOBAL_DONE'] = 1			print 'closing globals'			self.notifyAll()			self.engine.notify()			self.unregisterAllServices()			self.socket.setsockopt(socket.SOL_IP, socket.IP_DROP_MEMBERSHIP, socket.inet_aton(_MDNS_ADDR) + socket.inet_aton('0.0.0.0'))			self.socket.close()			# Test a few module features, including service registration, service# query (for Zoe), and service unregistration.if __name__ == '__main__':		print "Multicast DNS Service Discovery for Python, version", __version__	r = Zeroconf()	print "1. Testing registration of a service..."	desc = {'version':'0.10','a':'test value', 'b':'another value'}	info = ServiceInfo("_http._tcp.local.", "My Service Name._http._tcp.local.", socket.inet_aton("127.0.0.1"), 1234, 0, 0, desc)	print "   Registering service..."	r.registerService(info)	print "   Registration done."	print "2. Testing query of service information..."	print "   Getting ZOE service:", str(r.getServiceInfo("_http._tcp.local.", "ZOE._http._tcp.local."))	print "   Query done."	print "3. Testing query of own service..."	print "   Getting self:", str(r.getServiceInfo("_http._tcp.local.", "My Service Name._http._tcp.local."))	print "   Query done."	print "4. Testing unregister of service information..."	r.unregisterService(info)	print "   Unregister done."	r.close()

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?