📄 khashmir.py
字号:
# The contents of this file are subject to the BitTorrent Open Source License# Version 1.1 (the License). You may not copy or use this file, in either# source code or executable form, except in compliance with the License. You# may obtain a copy of the License at http://www.bittorrent.com/license/.## Software distributed under the License is distributed on an AS IS basis,# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License# for the specific language governing rights and limitations under the# License.import constfrom socket import gethostbynamefrom BitTorrent.platform import bttime as timefrom sha import shaimport refrom BitTorrent.defaultargs import common_options, rare_optionsfrom BitTorrent.RawServer_twisted import RawServerfrom ktable import KTable, Kfrom knode import *from kstore import KStorefrom khash import newID, newIDInRangefrom util import packNodesfrom actions import FindNode, GetValue, KeyExpirer, StoreValueimport krpcimport sysimport osimport tracebackfrom BitTorrent.bencode import bencode, bdecodefrom BitTorrent.defer import Deferredfrom random import randrangefrom kstore import samplefrom BitTorrent.stackthreading import Event, Threadip_pat = re.compile('[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}')class KhashmirDBExcept(Exception): passdef foo(bytes): pass # this is the base class, has base functionality and find node, no key-value mappingsclass KhashmirBase: _Node = KNodeBase def __init__(self, host, port, data_dir, rawserver=None, max_ul_rate=1024, checkpoint=True, errfunc=None, rlcount=foo, config={'pause':False, 'max_rate_period':20}): if rawserver: self.rawserver = rawserver else: self.flag = Event() d = dict([(x[0],x[1]) for x in common_options + rare_options]) self.rawserver = RawServer(self.flag, d) self.max_ul_rate = max_ul_rate self.socket = None self.config = config self.setup(host, port, data_dir, rlcount, checkpoint) def setup(self, host, port, data_dir, rlcount, checkpoint=True): self.host = host self.port = port self.ddir = data_dir self.store = KStore() self.pingcache = {} self.socket = self.rawserver.create_udpsocket(self.port, self.host, False) self.udp = krpc.hostbroker(self, (self.host, self.port), self.socket, self.rawserver.add_task, self.max_ul_rate, self.config, rlcount) self._load() self.rawserver.start_listening_udp(self.socket, self.udp) self.last = time() KeyExpirer(self.store, self.rawserver.add_task) self.refreshTable(force=1) if checkpoint: self.rawserver.add_task(30, self.findCloseNodes, lambda a: a, True) self.rawserver.add_task(60, self.checkpoint, 1) def Node(self): n = self._Node(self.udp.connectionForAddr) n.table = self return n def __del__(self): if self.socket is not None: self.rawserver.stop_listening_udp(self.socket) self.socket.close() def _load(self): do_load = False try: s = open(os.path.join(self.ddir, "routing_table"), 'r').read() dict = bdecode(s) except: id = newID() else: id = dict['id'] do_load = True self.node = self._Node(self.udp.connectionForAddr).init(id, self.host, self.port) self.table = KTable(self.node) if do_load: self._loadRoutingTable(dict['rt']) def checkpoint(self, auto=0): d = {} d['id'] = self.node.id d['rt'] = self._dumpRoutingTable() try: f = open(os.path.join(self.ddir, "routing_table"), 'wb') f.write(bencode(d)) f.close() except Exception, e: #XXX real error here print ">>> unable to dump routing table!", str(e) pass if auto: self.rawserver.add_task(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, 1) def _loadRoutingTable(self, nodes): """ load routing table nodes from database it's usually a good idea to call refreshTable(force=1) after loading the table """ for rec in nodes: n = self.Node().initWithDict(rec) self.table.insertNode(n, contacted=0, nocheck=True) def _dumpRoutingTable(self): """ save routing table nodes to the database """ l = [] for bucket in self.table.buckets: for node in bucket.l: l.append({'id':node.id, 'host':node.host, 'port':node.port, 'age':int(node.age)}) return l def _addContact(self, host, port, callback=None): """ ping this node and add the contact info to the table on pong! """ n =self.Node().init(const.NULL_ID, host, port) try: self.sendPing(n, callback=callback) except krpc.KRPCSelfNodeError: # our own node pass ####### ####### LOCAL INTERFACE - use these methods! def addContact(self, ip, port, callback=None): """ ping this node and add the contact info to the table on pong! """ if ip_pat.match(ip): self._addContact(ip, port) else: def go(ip=ip, port=port): ip = gethostbyname(ip) self.rawserver.external_add_task(0, self._addContact, ip, port) t = Thread(target=go) t.start() ## this call is async! def findNode(self, id, callback, errback=None): """ returns the contact info for node, or the k closest nodes, from the global table """ # get K nodes out of local table/cache, or the node we want nodes = self.table.findNodes(id, invalid=True) l = [x for x in nodes if x.invalid] if len(l) > 4: nodes = sample(l , 4) + self.table.findNodes(id, invalid=False)[:4] d = Deferred() if errback: d.addCallbacks(callback, errback) else: d.addCallback(callback) if len(nodes) == 1 and nodes[0].id == id : d.callback(nodes) else: # create our search state state = FindNode(self, id, d.callback, self.rawserver.add_task) self.rawserver.external_add_task(0, state.goWithNodes, nodes) def insertNode(self, n, contacted=1): """ insert a node in our local table, pinging oldest contact in bucket, if necessary If all you have is a host/port, then use addContact, which calls this method after receiving the PONG from the remote node. The reason for the seperation is we can't insert a node into the table without it's peer-ID. That means of course the node passed into this method needs to be a properly formed Node object with a valid ID. """ old = self.table.insertNode(n, contacted=contacted) if old and old != n: if not old.inPing(): self.checkOldNode(old, n, contacted) else: l = self.pingcache.get(old.id, []) if len(l) < 10 or contacted: l.append((n, contacted)) self.pingcache[old.id] = l def checkOldNode(self, old, new, contacted=False): ## these are the callbacks used when we ping the oldest node in a bucket def cmp(a, b): if a[1] == 1 and b[1] == 0:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -