📄 storagewrapper.py
字号:
# The contents of this file are subject to the BitTorrent Open Source License# Version 1.1 (the License). You may not copy or use this file, in either# source code or executable form, except in compliance with the License. You# may obtain a copy of the License at http://www.bittorrent.com/license/.## Software distributed under the License is distributed on an AS IS basis,# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License# for the specific language governing rights and limitations under the# License.# Written by Bram Cohen and Greg Hazelfrom __future__ import divisionfrom __future__ import generatorsimport osimport sysimport structimport cPickleimport loggingfrom array import arrayfrom BitTorrent.translation import _from BitTorrent.hash import shafrom BitTorrent.obsoletepythonsupport import setfrom BitTorrent.sparse_set import SparseSetfrom BitTorrent.bitfield import Bitfieldfrom BitTorrent.defer import Deferredfrom BitTorrent.yielddefer import launch_coroutine, _wrap_taskfrom BitTorrent import BTFailureNO_PLACE = -1ALLOCATED = -1UNALLOCATED = -2FASTRESUME_PARTIAL = -3global_logger = logging.getLogger('StorageWrapper')#global_logger.setLevel(logging.DEBUG)#global_logger.addHandler(logging.StreamHandler(sys.stdout))class DataPig(object): def __init__(self, read, add_task): self.add_task = add_task self.read = read self.failed_pieces = {} self.download_history = {} def got_piece(self, index, begin, length, source): if index in self.failed_pieces: df = launch_coroutine(_wrap_task(self.add_task), self._got_piece, index, begin, length, source) return df self.download_history.setdefault(index, {}) self.download_history[index][begin] = source def _got_piece(self, index, begin, piece, source): df = self.read(index, len(piece), offset=begin) yield df data = df.getResult() if data != piece: if (index in self.download_history and begin in self.download_history[index]): d = self.download_history[index][begin] self.failed_pieces[index].add(d) self.download_history.setdefault(index, {}) self.download_history[index][begin] = source def finished_piece(self, index): for d in self.download_history[index].itervalues(): d.good(index) del self.download_history[index] if index in self.failed_pieces: for d in self.failed_pieces[index]: d.bad(index) del self.failed_pieces[index] def failed_piece(self, index): self.failed_pieces[index] = set() allsenders = {} for d in self.download_history[index].itervalues(): allsenders[d] = None if len(allsenders) == 1: culprit = allsenders.keys()[0] culprit.bad(index, bump = True) del self.failed_pieces[index] # found the culprit already current_version = 2resume_prefix = 'BitTorrent resume state file, version 'version_string = resume_prefix + str(current_version)class StorageWrapper(object): READ_AHEAD_BUFFER_SIZE = 2**22 # 4mB def __init__(self, storage, config, hashes, piece_size, statusfunc, doneflag, data_flunked, infohash, # needed for partials errorfunc, working_path, destination_path, resumefile, add_task, external_add_task): assert len(hashes) > 0 assert piece_size > 0 self.initialized = False self.numpieces = len(hashes) self.infohash = infohash self.add_task = add_task self.external_add_task = external_add_task self.storage = storage self.config = config self.doneflag = doneflag self.hashes = hashes self.piece_size = piece_size self.data_flunked = data_flunked self.errorfunc = errorfunc self.statusfunc = statusfunc self.total_length = storage.get_total_length() # a brief explanation about the mildly confusing amount_ variables: # amount_left: total_length - fully_written_pieces # amount_inactive: amount_left - requests_pending_on_network # amount_left_with_partials (only correct during startup): amount_left + blocks_written self.amount_left = self.total_length self.amount_inactive = self.total_length if self.total_length <= piece_size * (self.numpieces - 1): raise BTFailure(_("bad data in responsefile - total too small")) if self.total_length > piece_size * self.numpieces: raise BTFailure(_("bad data in responsefile - total too big")) # If chunks have been requested then inactive_requests has a list # of the unrequested chunks. Otherwise the piece is not in the dict. self.inactive_requests = {} # If chunks have been requested then active_requests has a list # of the unrequested chunks which are pending on the network. # Otherwise the piece is not in the dict. self.active_requests = {} read = lambda index, length, offset : self._storage_read(self.places[index], length, offset=offset) self.datapig = DataPig(read, self.add_task) self.full_pieces = SparseSet() # a index => df dict for locking pieces self.blocking_pieces = {} self.endgame = False self.have = Bitfield(self.numpieces) self.have_set = SparseSet() self.checked_pieces = SparseSet() self.fastresume = False self.fastresume_dirty = False self._pieces_in_buf = [] self._piece_buf = None self.partial_mark = None if self.numpieces < 32768: self.typecode = 'h' else: self.typecode = 'l' self.places = array(self.typecode, [NO_PLACE] * self.numpieces) check_hashes = self.config['check_hashes'] self.done_checking_df = Deferred() self.lastlen = self._piecelen(self.numpieces - 1) global_logger.debug("Loading fastresume...") if not check_hashes: self.rplaces = array(self.typecode, range(self.numpieces)) self._initialized(True) else: try: result = self.read_fastresume(resumefile, working_path, destination_path) # if resume file doesn't apply to this destination or # working path then start over. if not result: self.rplaces = array(self.typecode, [UNALLOCATED] * self.numpieces) # full hashcheck df = self.hashcheck_pieces() df.addCallback(self._initialized) except: if resumefile is not None: global_logger.warning("Failed to read fastresume", exc_info=sys.exc_info()) self.rplaces = array(self.typecode, [UNALLOCATED] * self.numpieces) # full hashcheck df = self.hashcheck_pieces() df.addCallback(self._initialized) def _initialized(self, v): self._pieces_in_buf = [] self._piece_buf = None self.initialized = v global_logger.debug('Initialized') self.done_checking_df.callback(v) ## fastresume ############################################################################ def read_fastresume(self, f, working_path, destination_path): version_line = f.readline().strip() try: resume_version = version_line.split(resume_prefix)[1] except Exception, e: raise BTFailure(_("Unsupported fastresume file format, " "probably corrupted: %s on (%s)") % (unicode(e.args[0]), repr(version_line))) global_logger.debug('Reading fastresume v' + resume_version) if resume_version == '1': return self._read_fastresume_v1(f, working_path, destination_path) elif resume_version == '2': return self._read_fastresume_v2(f, working_path, destination_path) else: raise BTFailure(_("Unsupported fastresume file format, " "maybe from another client version?")) def _read_fastresume_v1(self, f, working_path, destination_path): # skip a bunch of lines amount_done = int(f.readline()) for b, e, filename in self.storage.ranges: line = f.readline() # now for the good stuff r = array(self.typecode) r.fromfile(f, self.numpieces) self.rplaces = r df = self.checkPieces_v1() df.addCallback(self._initialized) def checkPieces_v1(self): df = launch_coroutine(_wrap_task(self.add_task), self._checkPieces_v1) return df def _checkPieces_v1(self): partials = {} needs_full_hashcheck = False for i in xrange(self.numpieces): piece_len = self._piecelen(i) t = self.rplaces[i] if t >= 0: self._markgot(t, i) elif t in (ALLOCATED, UNALLOCATED): pass elif t == FASTRESUME_PARTIAL: df = self._storage_read(i, piece_len) yield df try: data = df.getResult() except: global_logger.error(_("Bad fastresume info (truncation at piece %d)") % i) needs_full_hashcheck = True i -= 1 break self._check_partial(i, partials, data) self.rplaces[i] = ALLOCATED # we're shutting down, abort. if self.doneflag.isSet(): yield False else: global_logger.error(_("Bad fastresume info (illegal value at piece %d)") % i) needs_full_hashcheck = True i -= 1 break if needs_full_hashcheck: df = self.hashcheck_pieces(i) yield df r = df.getResult() if r == False: yield False self._realize_partials(partials) yield True def _read_fastresume_v2(self, f, working_path, destination_path): # The working and destination paths are "save_as" paths meaning # that they refer to the entire path for a single-file torrent and the # name of the directory containing the files for a batch torrent. # Path read from resume should either reside in/at the
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -