📄 zeroconf.py
字号:
""" Multicast DNS Service Discovery for Python, v0.12 Copyright (C) 2003, Paul Scott-Murphy This module provides a framework for the use of DNS Service Discovery using IP multicast. It has been tested against the JRendezvous implementation from <a href="http://strangeberry.com">StrangeBerry</a>, and against the mDNSResponder from Mac OS X 10.3.8. This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """"""0.12 update - allow selection of binding interface typo fix - Thanks A. M. Kuchlingi removed all use of word 'Rendezvous' - this is an API change""""""0.11 update - correction to comments for addListener method support for new record types seen from OS X - IPv6 address - hostinfo ignore unknown DNS record types fixes to name decoding works alongside other processes using port 5353 (e.g. on Mac OS X) tested against Mac OS X 10.3.2's mDNSResponder corrections to removal of list entries for service browser""""""0.10 update - Jonathon Paisley contributed these corrections: always multicast replies, even when query is unicast correct a pointer encoding problem can now write records in any order traceback shown on failure better TXT record parsing server is now separate from name can cancel a service browser modified some unit tests to accommodate these changes""""""0.09 update - remove all records on service unregistration fix DOS security problem with readName""""""0.08 update - changed licensing to LGPL""""""0.07 update - faster shutdown on engine pointer encoding of outgoing names ServiceBrowser now works new unit tests""""""0.06 update - small improvements with unit tests added defined exception types new style objects fixed hostname/interface problem fixed socket timeout problem fixed addServiceListener() typo bug using select() for socket reads tested on Debian unstable with Python 2.2.2""""""0.05 update - ensure case insensitivty on domain names support for unicast DNS queries""""""0.04 update - added some unit tests added __ne__ adjuncts where required ensure names end in '.local.' timeout on receiving socket for clean shutdown"""__author__ = "Paul Scott-Murphy"__email__ = "paul at scott dash murphy dot com"__version__ = "0.12"import stringimport timeimport structimport socketimport threadingimport selectimport traceback__all__ = ["Zeroconf", "ServiceInfo", "ServiceBrowser"]# hook for threadsglobals()['_GLOBAL_DONE'] = 0# Some timing constants_UNREGISTER_TIME = 125_CHECK_TIME = 175_REGISTER_TIME = 225_LISTENER_TIME = 200_BROWSER_TIME = 500# Some DNS constants _MDNS_ADDR = '224.0.0.251'_MDNS_PORT = 5353;_DNS_PORT = 53;_DNS_TTL = 60 * 60; # one hour default TTL_MAX_MSG_TYPICAL = 1460 # unused_MAX_MSG_ABSOLUTE = 8972_FLAGS_QR_MASK = 0x8000 # query response mask_FLAGS_QR_QUERY = 0x0000 # query_FLAGS_QR_RESPONSE = 0x8000 # response_FLAGS_AA = 0x0400 # Authorative answer_FLAGS_TC = 0x0200 # Truncated_FLAGS_RD = 0x0100 # Recursion desired_FLAGS_RA = 0x8000 # Recursion available_FLAGS_Z = 0x0040 # Zero_FLAGS_AD = 0x0020 # Authentic data_FLAGS_CD = 0x0010 # Checking disabled_CLASS_IN = 1_CLASS_CS = 2_CLASS_CH = 3_CLASS_HS = 4_CLASS_NONE = 254_CLASS_ANY = 255_CLASS_MASK = 0x7FFF_CLASS_UNIQUE = 0x8000_TYPE_A = 1_TYPE_NS = 2_TYPE_MD = 3_TYPE_MF = 4_TYPE_CNAME = 5_TYPE_SOA = 6_TYPE_MB = 7_TYPE_MG = 8_TYPE_MR = 9_TYPE_NULL = 10_TYPE_WKS = 11_TYPE_PTR = 12_TYPE_HINFO = 13_TYPE_MINFO = 14_TYPE_MX = 15_TYPE_TXT = 16_TYPE_AAAA = 28_TYPE_SRV = 33_TYPE_ANY = 255# Mapping constants to names_CLASSES = { _CLASS_IN : "in", _CLASS_CS : "cs", _CLASS_CH : "ch", _CLASS_HS : "hs", _CLASS_NONE : "none", _CLASS_ANY : "any" }_TYPES = { _TYPE_A : "a", _TYPE_NS : "ns", _TYPE_MD : "md", _TYPE_MF : "mf", _TYPE_CNAME : "cname", _TYPE_SOA : "soa", _TYPE_MB : "mb", _TYPE_MG : "mg", _TYPE_MR : "mr", _TYPE_NULL : "null", _TYPE_WKS : "wks", _TYPE_PTR : "ptr", _TYPE_HINFO : "hinfo", _TYPE_MINFO : "minfo", _TYPE_MX : "mx", _TYPE_TXT : "txt", _TYPE_AAAA : "quada", _TYPE_SRV : "srv", _TYPE_ANY : "any" }# utility functionsdef currentTimeMillis(): """Current system time in milliseconds""" return time.time() * 1000# Exceptionsclass NonLocalNameException(Exception): passclass NonUniqueNameException(Exception): passclass NamePartTooLongException(Exception): passclass AbstractMethodException(Exception): passclass BadTypeInNameException(Exception): pass# implementation classesclass DNSEntry(object): """A DNS entry""" def __init__(self, name, type, clazz): self.key = string.lower(name) self.name = name self.type = type self.clazz = clazz & _CLASS_MASK self.unique = (clazz & _CLASS_UNIQUE) != 0 def __eq__(self, other): """Equality test on name, type, and class""" if isinstance(other, DNSEntry): return self.name == other.name and self.type == other.type and self.clazz == other.clazz return 0 def __ne__(self, other): """Non-equality test""" return not self.__eq__(other) def getClazz(self, clazz): """Class accessor""" try: return _CLASSES[clazz] except: return "?(%s)" % (clazz) def getType(self, type): """Type accessor""" try: return _TYPES[type] except: return "?(%s)" % (type) def toString(self, hdr, other): """String representation with additional information""" result = "%s[%s,%s" % (hdr, self.getType(self.type), self.getClazz(self.clazz)) if self.unique: result += "-unique," else: result += "," result += self.name if other is not None: result += ",%s]" % (other) else: result += "]" return resultclass DNSQuestion(DNSEntry): """A DNS question entry""" def __init__(self, name, type, clazz): if not name.endswith(".local."): raise NonLocalNameException DNSEntry.__init__(self, name, type, clazz) def answeredBy(self, rec): """Returns true if the question is answered by the record""" return self.clazz == rec.clazz and (self.type == rec.type or self.type == _TYPE_ANY) and self.name == rec.name def __repr__(self): """String representation""" return DNSEntry.toString(self, "question", None)class DNSRecord(DNSEntry): """A DNS record - like a DNS entry, but has a TTL""" def __init__(self, name, type, clazz, ttl): DNSEntry.__init__(self, name, type, clazz) self.ttl = ttl self.created = currentTimeMillis() def __eq__(self, other): """Tests equality as per DNSRecord""" if isinstance(other, DNSRecord): return DNSEntry.__eq__(self, other) return 0 def suppressedBy(self, msg): """Returns true if any answer in a message can suffice for the information held in this record.""" for record in msg.answers: if self.suppressedByAnswer(record): return 1 return 0 def suppressedByAnswer(self, other): """Returns true if another record has same name, type and class, and if its TTL is at least half of this record's.""" if self == other and other.ttl > (self.ttl / 2): return 1 return 0 def getExpirationTime(self, percent): """Returns the time at which this record will have expired by a certain percentage.""" return self.created + (percent * self.ttl * 10) def getRemainingTTL(self, now): """Returns the remaining TTL in seconds.""" return max(0, (self.getExpirationTime(100) - now) / 1000) def isExpired(self, now): """Returns true if this record has expired.""" return self.getExpirationTime(100) <= now def isStale(self, now): """Returns true if this record is at least half way expired.""" return self.getExpirationTime(50) <= now def resetTTL(self, other): """Sets this record's TTL and created time to that of another record.""" self.created = other.created self.ttl = other.ttl def write(self, out): """Abstract method""" raise AbstractMethodException def toString(self, other): """String representation with addtional information""" arg = "%s/%s,%s" % (self.ttl, self.getRemainingTTL(currentTimeMillis()), other) return DNSEntry.toString(self, "record", arg)class DNSAddress(DNSRecord): """A DNS address record""" def __init__(self, name, type, clazz, ttl, address): DNSRecord.__init__(self, name, type, clazz, ttl) self.address = address def write(self, out): """Used in constructing an outgoing packet""" out.writeString(self.address, len(self.address)) def __eq__(self, other): """Tests equality on address""" if isinstance(other, DNSAddress): return self.address == other.address return 0 def __repr__(self): """String representation""" try: return socket.inet_ntoa(self.address) except: return self.addressclass DNSHinfo(DNSRecord): """A DNS host information record""" def __init__(self, name, type, clazz, ttl, cpu, os): DNSRecord.__init__(self, name, type, clazz, ttl) self.cpu = cpu self.os = os def write(self, out): """Used in constructing an outgoing packet""" out.writeString(self.cpu, len(self.cpu)) out.writeString(self.os, len(self.os)) def __eq__(self, other): """Tests equality on cpu and os""" if isinstance(other, DNSHinfo): return self.cpu == other.cpu and self.os == other.os return 0 def __repr__(self): """String representation""" return self.cpu + " " + self.os class DNSPointer(DNSRecord): """A DNS pointer record""" def __init__(self, name, type, clazz, ttl, alias): DNSRecord.__init__(self, name, type, clazz, ttl) self.alias = alias def write(self, out): """Used in constructing an outgoing packet""" out.writeName(self.alias) def __eq__(self, other): """Tests equality on alias"""
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -