zeroconf.py
来自「Amarok是一款在LINUX或其他类UNIX操作系统中运行的音频播放器软件。 」· Python 代码 · 共 1,579 行 · 第 1/4 页
PY
1,579 行
"""Removes an entry""" try: list = self.cache[entry.key] list.remove(entry) except: pass def get(self, entry): """Gets an entry by key. Will return None if there is no matching entry.""" try: list = self.cache[entry.key] return list[list.index(entry)] except: return None def getByDetails(self, name, type, clazz): """Gets an entry by details. Will return None if there is no matching entry.""" entry = DNSEntry(name, type, clazz) return self.get(entry) def entriesWithName(self, name): """Returns a list of entries whose key matches the name.""" try: return self.cache[name] except: return [] def entries(self): """Returns a list of all entries""" def add(x, y): return x+y try: return reduce(add, self.cache.values()) except: return []class Engine(threading.Thread): """An engine wraps read access to sockets, allowing objects that need to receive data from sockets to be called back when the sockets are ready. A reader needs a handle_read() method, which is called when the socket it is interested in is ready for reading. Writers are not implemented here, because we only send short packets. """ def __init__(self, zeroconf): threading.Thread.__init__(self) self.zeroconf = zeroconf self.readers = {} # maps socket to reader self.timeout = 5 self.condition = threading.Condition() self.start() def run(self): while not globals()['_GLOBAL_DONE']: rs = self.getReaders() if len(rs) == 0: # No sockets to manage, but we wait for the timeout # or addition of a socket # self.condition.acquire() self.condition.wait(self.timeout) self.condition.release() else: try: rr, wr, er = select.select(rs, [], [], self.timeout) for socket in rr: try: self.readers[socket].handle_read() except: traceback.print_exc() except: pass def getReaders(self): result = [] self.condition.acquire() result = self.readers.keys() self.condition.release() return result def addReader(self, reader, socket): self.condition.acquire() self.readers[socket] = reader self.condition.notify() self.condition.release() def delReader(self, socket): self.condition.acquire() del(self.readers[socket]) self.condition.notify() self.condition.release() def notify(self): self.condition.acquire() self.condition.notify() self.condition.release()class Listener(object): """A Listener is used by this module to listen on the multicast group to which DNS messages are sent, allowing the implementation to cache information as it arrives. It requires registration with an Engine object in order to have the read() method called when a socket is availble for reading.""" def __init__(self, zeroconf): self.zeroconf = zeroconf self.zeroconf.engine.addReader(self, self.zeroconf.socket) def handle_read(self): data, (addr, port) = self.zeroconf.socket.recvfrom(_MAX_MSG_ABSOLUTE) self.data = data msg = DNSIncoming(data) if msg.isQuery(): # Always multicast responses # if port == _MDNS_PORT: self.zeroconf.handleQuery(msg, _MDNS_ADDR, _MDNS_PORT) # If it's not a multicast query, reply via unicast # and multicast # elif port == _DNS_PORT: self.zeroconf.handleQuery(msg, addr, port) self.zeroconf.handleQuery(msg, _MDNS_ADDR, _MDNS_PORT) else: self.zeroconf.handleResponse(msg)class Reaper(threading.Thread): """A Reaper is used by this module to remove cache entries that have expired.""" def __init__(self, zeroconf): threading.Thread.__init__(self) self.zeroconf = zeroconf self.start() def run(self): while 1: self.zeroconf.wait(10 * 1000) if globals()['_GLOBAL_DONE']: return now = currentTimeMillis() for record in self.zeroconf.cache.entries(): if record.isExpired(now): self.zeroconf.updateRecord(now, record) self.zeroconf.cache.remove(record)class ServiceBrowser(threading.Thread): """Used to browse for a service of a specific type. The listener object will have its addService() and removeService() methods called when this browser discovers changes in the services availability.""" def __init__(self, zeroconf, type, listener): """Creates a browser for a specific type""" threading.Thread.__init__(self) self.zeroconf = zeroconf self.type = type self.listener = listener self.services = {} self.nextTime = currentTimeMillis() self.delay = _BROWSER_TIME self.list = [] self.done = 0 self.zeroconf.addListener(self, DNSQuestion(self.type, _TYPE_PTR, _CLASS_IN)) self.start() def updateRecord(self, zeroconf, now, record): """Callback invoked by Zeroconf when new information arrives. Updates information required by browser in the Zeroconf cache.""" if record.type == _TYPE_PTR and record.name == self.type: expired = record.isExpired(now) try: oldrecord = self.services[record.alias.lower()] if not expired: oldrecord.resetTTL(record) else: del(self.services[record.alias.lower()]) callback = lambda x: self.listener.removeService(x, self.type, record.alias) self.list.append(callback) return except: if not expired: self.services[record.alias.lower()] = record callback = lambda x: self.listener.addService(x, self.type, record.alias) self.list.append(callback) expires = record.getExpirationTime(75) if expires < self.nextTime: self.nextTime = expires def cancel(self): self.done = 1 self.zeroconf.notifyAll() def run(self): while 1: event = None now = currentTimeMillis() if len(self.list) == 0 and self.nextTime > now: self.zeroconf.wait(self.nextTime - now) if globals()['_GLOBAL_DONE'] or self.done: return now = currentTimeMillis() if self.nextTime <= now: out = DNSOutgoing(_FLAGS_QR_QUERY) out.addQuestion(DNSQuestion(self.type, _TYPE_PTR, _CLASS_IN)) for record in self.services.values(): if not record.isExpired(now): out.addAnswerAtTime(record, now) self.zeroconf.send(out) self.nextTime = now + self.delay self.delay = min(20 * 1000, self.delay * 2) if len(self.list) > 0: event = self.list.pop(0) if event is not None: event(self.zeroconf) class ServiceInfo(object): """Service information""" def __init__(self, type, name, address=None, port=None, weight=0, priority=0, properties=None, server=None): """Create a service description. type: fully qualified service type name name: fully qualified service name address: IP address as unsigned short, network byte order port: port that the service runs on weight: weight of the service priority: priority of the service properties: dictionary of properties (or a string holding the bytes for the text field) server: fully qualified name for service host (defaults to name)""" if not name.endswith(type): raise BadTypeInNameException self.type = type self.name = name self.address = address self.port = port self.weight = weight self.priority = priority if server: self.server = server else: self.server = name self.setProperties(properties) def setProperties(self, properties): """Sets properties and text of this info from a dictionary""" if isinstance(properties, dict): self.properties = properties list = [] result = '' for key in properties: value = properties[key] if value is None: suffix = ''.encode('utf-8') elif isinstance(value, str): suffix = value.encode('utf-8') elif isinstance(value, int): if value: suffix = 'true' else: suffix = 'false' else: suffix = ''.encode('utf-8') list.append('='.join((key, suffix))) for item in list: result = ''.join((result, struct.pack('!c', chr(len(item))), item)) self.text = result else: self.text = properties def setText(self, text): """Sets properties and text given a text field""" self.text = text try: result = {} end = len(text) index = 0 strs = [] while index < end: length = ord(text[index]) index += 1 strs.append(text[index:index+length]) index += length for s in strs: eindex = s.find('=') if eindex == -1: # No equals sign at all key = s value = 0 else: key = s[:eindex] value = s[eindex+1:] if value == 'true': value = 1 elif value == 'false' or not value: value = 0 # Only update non-existent properties if key and result.get(key) == None: result[key] = value self.properties = result except: traceback.print_exc() self.properties = None def getType(self): """Type accessor""" return self.type def getName(self): """Name accessor""" if self.type is not None and self.name.endswith("." + self.type): return self.name[:len(self.name) - len(self.type) - 1] return self.name def getAddress(self): """Address accessor""" return self.address def getPort(self): """Port accessor""" return self.port def getPriority(self): """Pirority accessor""" return self.priority def getWeight(self): """Weight accessor""" return self.weight def getProperties(self): """Properties accessor""" return self.properties def getText(self): """Text accessor""" return self.text def getServer(self): """Server accessor""" return self.server def updateRecord(self, zeroconf, now, record): """Updates service information from a DNS record""" if record is not None and not record.isExpired(now): if record.type == _TYPE_A: if record.name == self.name: self.address = record.address elif record.type == _TYPE_SRV: if record.name == self.name: self.server = record.server self.port = record.port self.weight = record.weight self.priority = record.priority self.address = None self.updateRecord(zeroconf, now, zeroconf.cache.getByDetails(self.server, _TYPE_A, _CLASS_IN)) elif record.type == _TYPE_TXT: if record.name == self.name: self.setText(record.text) def request(self, zeroconf, timeout): """Returns true if the service could be discovered on the network, and updates this object with details discovered. """ now = currentTimeMillis() delay = _LISTENER_TIME next = now + delay last = now + timeout result = 0 try: zeroconf.addListener(self, DNSQuestion(self.name, _TYPE_ANY, _CLASS_IN)) while self.server is None or self.address is None or self.text is None: if last <= now:
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?