📄 khashmir.py
字号:
return -1 elif b[1] == 1 and a[1] == 0: return 1 else: return 0 def _staleNodeHandler(dict, old=old, new=new, contacted=contacted): """ called if the pinged node never responds """ if old.fails >= 2: l = self.pingcache.get(old.id, []) l.sort(cmp) if l: n, nc = l[0] if (not contacted) and nc: l = l[1:] + [(new, contacted)] new = n contacted = nc o = self.table.replaceStaleNode(old, new) if o and o != new: self.checkOldNode(o, new) try: self.pingcache[o.id] = self.pingcache[old.id] del(self.pingcache[old.id]) except KeyError: pass else: if l: del(self.pingcache[old.id]) l.sort(cmp) for node in l: self.insertNode(node[0], node[1]) else: l = self.pingcache.get(old.id, []) if l: del(self.pingcache[old.id]) self.insertNode(new, contacted) for node in l: self.insertNode(node[0], node[1]) def _notStaleNodeHandler(dict, old=old, new=new, contacted=contacted): """ called when we get a pong from the old node """ self.table.insertNode(old, True) self.insertNode(new, contacted) l = self.pingcache.get(old.id, []) l.sort(cmp) for node in l: self.insertNode(node[0], node[1]) try: del(self.pingcache[old.id]) except KeyError: pass try: df = old.ping(self.node.id) except krpc.KRPCSelfNodeError: pass df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) def sendPing(self, node, callback=None): """ ping a node """ try: df = node.ping(self.node.id) except krpc.KRPCSelfNodeError: pass else: ## these are the callbacks we use when we issue a PING def _pongHandler(dict, node=node, table=self.table, callback=callback): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] sender = {'id' : dict['id']} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = self.Node().initWithDict(sender) table.insertNode(n) if callback: callback() def _defaultPong(err, node=node, table=self.table, callback=callback): if callback: callback() df.addCallbacks(_pongHandler,_defaultPong) def findCloseNodes(self, callback=lambda a: a, auto=False): """ This does a findNode on the ID one away from our own. This will allow us to populate our table with nodes on our network closest to our own. This is called as soon as we start up with an empty table """ if not self.config['pause']: id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) self.findNode(id, callback) if auto: if not self.config['pause']: self.refreshTable() self.rawserver.external_add_task(randrange(int(const.FIND_CLOSE_INTERVAL *0.9), int(const.FIND_CLOSE_INTERVAL *1.1)), self.findCloseNodes, lambda a: True, True) def refreshTable(self, force=0): """ force=1 will refresh table regardless of last bucket access time """ def callback(nodes): pass refresh = [bucket for bucket in self.table.buckets if force or (len(bucket.l) < K) or len(filter(lambda a: a.invalid, bucket.l)) or (time() - bucket.lastAccessed > const.BUCKET_STALENESS)] for bucket in refresh: id = newIDInRange(bucket.min, bucket.max) self.findNode(id, callback) def stats(self): """ Returns (num_contacts, num_nodes) num_contacts: number contacts in our routing table num_nodes: number of nodes estimated in the entire dht """ num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0) num_nodes = const.K * (2**(len(self.table.buckets) - 1)) return {'num_contacts':num_contacts, 'num_nodes':num_nodes} def krpc_ping(self, id, _krpc_sender): sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = self.Node().initWithDict(sender) self.insertNode(n, contacted=0) return {"id" : self.node.id} def krpc_find_node(self, target, id, _krpc_sender): nodes = self.table.findNodes(target, invalid=False) nodes = map(lambda node: node.senderDict(), nodes) sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = self.Node().initWithDict(sender) self.insertNode(n, contacted=0) return {"nodes" : packNodes(nodes), "id" : self.node.id}## This class provides read-only access to the DHT, valueForKey## you probably want to use this mixin and provide your own write methodsclass KhashmirRead(KhashmirBase): _Node = KNodeRead def retrieveValues(self, key): try: l = self.store[key] except KeyError: l = [] return l ## also async def valueForKey(self, key, callback, searchlocal = 1): """ returns the values found for key in global table callback will be called with a list of values for each peer that returns unique values final callback will be an empty list - probably should change to 'more coming' arg """ nodes = self.table.findNodes(key) # get locals if searchlocal: l = self.retrieveValues(key) if len(l) > 0: self.rawserver.external_add_task(0, callback, l) else: l = [] # create our search state state = GetValue(self, key, callback, self.rawserver.add_task) self.rawserver.external_add_task(0, state.goWithNodes, nodes, l) def krpc_find_value(self, key, id, _krpc_sender): sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = self.Node().initWithDict(sender) self.insertNode(n, contacted=0) l = self.retrieveValues(key) if len(l) > 0: return {'values' : l, "id": self.node.id} else: nodes = self.table.findNodes(key, invalid=False) nodes = map(lambda node: node.senderDict(), nodes) return {'nodes' : packNodes(nodes), "id": self.node.id}### provides a generic write method, you probably don't want to deploy something that allows### arbitrary value storageclass KhashmirWrite(KhashmirRead): _Node = KNodeWrite ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) def storeValueForKey(self, key, value, callback=None): """ stores the value for key in the global table, returns immediately, no status in this implementation, peers respond but don't indicate status to storing values a key can have many values """ def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): if not response: # default callback def _storedValueHandler(sender): pass response=_storedValueHandler action = StoreValue(self, key, value, response, self.rawserver.add_task) self.rawserver.external_add_task(0, action.goWithNodes, nodes) # this call is asynch self.findNode(key, _storeValueForKey) def krpc_store_value(self, key, value, id, _krpc_sender): t = "%0.6f" % time() self.store[key] = value sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = self.Node().initWithDict(sender) self.insertNode(n, contacted=0) return {"id" : self.node.id}# the whole shebang, for testingclass Khashmir(KhashmirWrite): _Node = KNodeWrite
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -