📄 zeroconf.py
字号:
self.updateRecord(zeroconf, now, zeroconf.cache.getByDetails(self.server, _TYPE_A, _CLASS_IN)) elif record.type == _TYPE_TXT: if record.name == self.name: self.setText(record.text) def request(self, zeroconf, timeout): """Returns true if the service could be discovered on the network, and updates this object with details discovered. """ now = currentTimeMillis() delay = _LISTENER_TIME next = now + delay last = now + timeout result = 0 try: zeroconf.addListener(self, DNSQuestion(self.name, _TYPE_ANY, _CLASS_IN)) while self.server is None or self.address is None or self.text is None: if last <= now: return 0 if next <= now: out = DNSOutgoing(_FLAGS_QR_QUERY) out.addQuestion(DNSQuestion(self.name, _TYPE_SRV, _CLASS_IN)) out.addAnswerAtTime(zeroconf.cache.getByDetails(self.name, _TYPE_SRV, _CLASS_IN), now) out.addQuestion(DNSQuestion(self.name, _TYPE_TXT, _CLASS_IN)) out.addAnswerAtTime(zeroconf.cache.getByDetails(self.name, _TYPE_TXT, _CLASS_IN), now) if self.server is not None: out.addQuestion(DNSQuestion(self.server, _TYPE_A, _CLASS_IN)) out.addAnswerAtTime(zeroconf.cache.getByDetails(self.server, _TYPE_A, _CLASS_IN), now) zeroconf.send(out) next = now + delay delay = delay * 2 zeroconf.wait(min(next, last) - now) now = currentTimeMillis() result = 1 finally: zeroconf.removeListener(self) return result def __eq__(self, other): """Tests equality of service name""" if isinstance(other, ServiceInfo): return other.name == self.name return 0 def __ne__(self, other): """Non-equality test""" return not self.__eq__(other) def __repr__(self): """String representation""" result = "service[%s,%s:%s," % (self.name, socket.inet_ntoa(self.getAddress()), self.port) if self.text is None: result += "None" else: if len(self.text) < 20: result += self.text else: result += self.text[:17] + "..." result += "]" return result class Zeroconf(object): """Implementation of Zeroconf Multicast DNS Service Discovery Supports registration, unregistration, queries and browsing. """ def __init__(self, bindaddress=None): """Creates an instance of the Zeroconf class, establishing multicast communications, listening and reaping threads.""" globals()['_GLOBAL_DONE'] = 0 if bindaddress is None: self.intf = socket.gethostbyname(socket.gethostname()) else: self.intf = bindaddress self.group = ('', _MDNS_PORT) self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) except: # SO_REUSEADDR should be equivalent to SO_REUSEPORT for # multicast UDP sockets (p 731, "TCP/IP Illustrated, # Volume 2"), but some BSD-derived systems require # SO_REUSEPORT to be specified explicity. Also, not all # versions of Python have SO_REUSEPORT available. So # if you're on a BSD-based system, and haven't upgraded # to Python 2.3 yet, you may find this library doesn't # work as expected. # pass self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 255) self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1) try: self.socket.bind(self.group) except: # Some versions of linux raise an exception even though # the SO_REUSE* options have been set, so ignore it # pass self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(self.intf) + socket.inet_aton('0.0.0.0')) self.socket.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(_MDNS_ADDR) + socket.inet_aton('0.0.0.0')) self.listeners = [] self.browsers = [] self.services = {} self.cache = DNSCache() self.condition = threading.Condition() self.engine = Engine(self) self.listener = Listener(self) self.reaper = Reaper(self) def isLoopback(self): return self.intf.startswith("127.0.0.1") def isLinklocal(self): return self.intf.startswith("169.254.") def wait(self, timeout): """Calling thread waits for a given number of milliseconds or until notified.""" self.condition.acquire() self.condition.wait(timeout/1000) self.condition.release() def notifyAll(self): """Notifies all waiting threads""" self.condition.acquire() self.condition.notifyAll() self.condition.release() def getServiceInfo(self, type, name, timeout=3000): """Returns network's service information for a particular name and type, or None if no service matches by the timeout, which defaults to 3 seconds.""" info = ServiceInfo(type, name) if info.request(self, timeout): return info return None def addServiceListener(self, type, listener): """Adds a listener for a particular service type. This object will then have its updateRecord method called when information arrives for that type.""" self.removeServiceListener(listener) self.browsers.append(ServiceBrowser(self, type, listener)) def removeServiceListener(self, listener): """Removes a listener from the set that is currently listening.""" for browser in self.browsers: if browser.listener == listener: browser.cancel() del(browser) def registerService(self, info, ttl=_DNS_TTL): """Registers service information to the network with a default TTL of 60 seconds. Zeroconf will then respond to requests for information for that service. The name of the service may be changed if needed to make it unique on the network.""" self.checkService(info) self.services[info.name.lower()] = info now = currentTimeMillis() nextTime = now i = 0 while i < 3: if now < nextTime: self.wait(nextTime - now) now = currentTimeMillis() continue out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA) out.addAnswerAtTime(DNSPointer(info.type, _TYPE_PTR, _CLASS_IN, ttl, info.name), 0) out.addAnswerAtTime(DNSService(info.name, _TYPE_SRV, _CLASS_IN, ttl, info.priority, info.weight, info.port, info.server), 0) out.addAnswerAtTime(DNSText(info.name, _TYPE_TXT, _CLASS_IN, ttl, info.text), 0) if info.address: out.addAnswerAtTime(DNSAddress(info.server, _TYPE_A, _CLASS_IN, ttl, info.address), 0) self.send(out) i += 1 nextTime += _REGISTER_TIME def unregisterService(self, info): """Unregister a service.""" try: del(self.services[info.name.lower()]) except: pass now = currentTimeMillis() nextTime = now i = 0 while i < 3: if now < nextTime: self.wait(nextTime - now) now = currentTimeMillis() continue out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA) out.addAnswerAtTime(DNSPointer(info.type, _TYPE_PTR, _CLASS_IN, 0, info.name), 0) out.addAnswerAtTime(DNSService(info.name, _TYPE_SRV, _CLASS_IN, 0, info.priority, info.weight, info.port, info.name), 0) out.addAnswerAtTime(DNSText(info.name, _TYPE_TXT, _CLASS_IN, 0, info.text), 0) if info.address: out.addAnswerAtTime(DNSAddress(info.server, _TYPE_A, _CLASS_IN, 0, info.address), 0) self.send(out) i += 1 nextTime += _UNREGISTER_TIME def unregisterAllServices(self): """Unregister all registered services.""" if len(self.services) > 0: now = currentTimeMillis() nextTime = now i = 0 while i < 3: if now < nextTime: self.wait(nextTime - now) now = currentTimeMillis() continue out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA) for info in self.services.values(): out.addAnswerAtTime(DNSPointer(info.type, _TYPE_PTR, _CLASS_IN, 0, info.name), 0) out.addAnswerAtTime(DNSService(info.name, _TYPE_SRV, _CLASS_IN, 0, info.priority, info.weight, info.port, info.server), 0) out.addAnswerAtTime(DNSText(info.name, _TYPE_TXT, _CLASS_IN, 0, info.text), 0) if info.address: out.addAnswerAtTime(DNSAddress(info.server, _TYPE_A, _CLASS_IN, 0, info.address), 0) self.send(out) i += 1 nextTime += _UNREGISTER_TIME def checkService(self, info): """Checks the network for a unique service name, modifying the ServiceInfo passed in if it is not unique.""" now = currentTimeMillis() nextTime = now i = 0 while i < 3: for record in self.cache.entriesWithName(info.type): if record.type == _TYPE_PTR and not record.isExpired(now) and record.alias == info.name: if (info.name.find('.') < 0): info.name = info.name + ".[" + info.address + ":" + info.port + "]." + info.type self.checkService(info) return raise NonUniqueNameException if now < nextTime: self.wait(nextTime - now) now = currentTimeMillis() continue out = DNSOutgoing(_FLAGS_QR_QUERY | _FLAGS_AA) self.debug = out out.addQuestion(DNSQuestion(info.type, _TYPE_PTR, _CLASS_IN)) out.addAuthorativeAnswer(DNSPointer(info.type, _TYPE_PTR, _CLASS_IN, _DNS_TTL, info.name)) self.send(out) i += 1 nextTime += _CHECK_TIME def addListener(self, listener, question): """Adds a listener for a given question. The listener will have its updateRecord method called when information is available to answer the question.""" now = currentTimeMillis() self.listeners.append(listener) if question is not None: for record in self.cache.entriesWithName(question.name): if question.answeredBy(record) and not record.isExpired(now): listener.updateRecord(self, now, record) self.notifyAll() def removeListener(self, listener): """Removes a listener.""" try: self.listeners.remove(listener) self.notifyAll() except: pass def updateRecord(self, now, rec): """Used to notify listeners of new information that has updated a record.""" for listener in self.listeners: listener.updateRecord(self, now, rec) self.notifyAll() def handleResponse(self, msg): """Deal with incoming response packets. All answers are held in the cache, and listeners are notified.""" now = currentTimeMillis() for record in msg.answers: expired = record.isExpired(now) if record in self.cache.entries(): if expired: self.cache.remove(record) else: entry = self.cache.get(record) if entry is not None: entry.resetTTL(record) record = entry else: self.cache.add(record) self.updateRecord(now, record) def handleQuery(self, msg, addr, port): """Deal with incoming query packets. Provides a response if possible.""" out = None # Support unicast client responses # if port != _MDNS_PORT: out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA, 0) for question in msg.questions: out.addQuestion(question) for question in msg.questions: if question.type == _TYPE_PTR: for service in self.services.values(): if question.name == service.type: if out is None: out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA) out.addAnswer(msg, DNSPointer(service.type, _TYPE_PTR, _CLASS_IN, _DNS_TTL, service.name)) else: try: if out is None: out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA) # Answer A record queries for any service addresses we know if question.type == _TYPE_A or question.type == _TYPE_ANY: for service in self.services.values(): if service.server == question.name.lower(): out.addAnswer(msg, DNSAddress(question.name, _TYPE_A, _CLASS_IN | _CLASS_UNIQUE, _DNS_TTL, service.address)) service = self.services.get(question.name.lower(), None) if not service: continue if question.type == _TYPE_SRV or question.type == _TYPE_ANY: out.addAnswer(msg, DNSService(question.name, _TYPE_SRV, _CLASS_IN | _CLASS_UNIQUE, _DNS_TTL, service.priority, service.weight, service.port, service.server)) if question.type == _TYPE_TXT or question.type == _TYPE_ANY: out.addAnswer(msg, DNSText(question.name, _TYPE_TXT, _CLASS_IN | _CLASS_UNIQUE, _DNS_TTL, service.text)) if question.type == _TYPE_SRV: out.addAdditionalAnswer(DNSAddress(service.server, _TYPE_A, _CLASS_IN | _CLASS_UNIQUE, _DNS_TTL, service.address)) except: traceback.print_exc() if out is not None and out.answers: out.id = msg.id self.send(out, addr, port) def send(self, out, addr = _MDNS_ADDR, port = _MDNS_PORT): """Sends an outgoing packet.""" # This is a quick test to see if we can parse the packets we generate #temp = DNSIncoming(out.packet()) try: bytes_sent = self.socket.sendto(out.packet(), 0, (addr, port)) except: # Ignore this, it may be a temporary loss of network connection pass def close(self): """Ends the background threads, and prevent this instance from servicing further queries.""" if globals()['_GLOBAL_DONE'] == 0: globals()['_GLOBAL_DONE'] = 1 self.notifyAll() self.engine.notify() self.unregisterAllServices() self.socket.setsockopt(socket.SOL_IP, socket.IP_DROP_MEMBERSHIP, socket.inet_aton(_MDNS_ADDR) + socket.inet_aton('0.0.0.0')) self.socket.close() # Test a few module features, including service registration, service# query (for Zoe), and service unregistration.if __name__ == '__main__': print "Multicast DNS Service Discovery for Python, version", __version__ r = Zeroconf() print "1. Testing registration of a service..." desc = {'version':'0.10','a':'test value', 'b':'another value'} info = ServiceInfo("_http._tcp.local.", "My Service Name._http._tcp.local.", socket.inet_aton("127.0.0.1"), 1234, 0, 0, desc) print " Registering service..." r.registerService(info) print " Registration done." print "2. Testing query of service information..." print " Getting ZOE service:", str(r.getServiceInfo("_http._tcp.local.", "ZOE._http._tcp.local.")) print " Query done." print "3. Testing query of own service..." print " Getting self:", str(r.getServiceInfo("_http._tcp.local.", "My Service Name._http._tcp.local.")) print " Query done." print "4. Testing unregister of service information..." r.unregisterService(info) print " Unregister done." r.close()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -