⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 storage_threadpool.py

📁 bittorrent source by python. please enjoy
💻 PY
字号:
# The contents of this file are subject to the BitTorrent Open Source License# Version 1.1 (the License).  You may not copy or use this file, in either# source code or executable form, except in compliance with the License.  You# may obtain a copy of the License at http://www.bittorrent.com/license/.## Software distributed under the License is distributed on an AS IS basis,# WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License# for the specific language governing rights and limitations under the# License.# Written by Bram Cohen and Greg Hazelimport osimport Queuefrom array import arrayfrom bisect import bisect_rightfrom BitTorrent.translation import _from BitTorrent.obsoletepythonsupport import *from BitTorrent import BTFailure, app_namefrom BitTorrent.defer import Deferred, ThreadedDeferredfrom BitTorrent.yielddefer import launch_coroutine, _wrap_taskfrom BitTorrent.platform import get_allocated_regionsfrom BitTorrent.sparse_set import SparseSetfrom BitTorrent.DictWithLists import DictWithLists, DictWithSetsfrom BitTorrent.Storage_base import open_sparse_file, make_file_sparse, bad_libc_workaround, is_open_for_writeimport BitTorrent.stackthreading as threadingclass FilePool(object):    def __init__(self, doneflag, add_task, external_add_task,                 max_files_open, num_disk_threads):        self.doneflag = doneflag        self.external_add_task = external_add_task        self.file_to_torrent = {}        self.free_handle_condition = threading.Condition()        self.active_file_to_handles = DictWithSets()        self.open_file_to_handles = DictWithLists()        self.set_max_files_open(max_files_open)        self.diskq = Queue.Queue()        for i in xrange(num_disk_threads):            t = threading.Thread(target=self._disk_thread,                                 name="disk_thread-%s" % (i+1))            t.start()        self.doneflag.addCallback(self.finalize)    def finalize(self, r=None):        # re-queue self so all threads die. we end up with one extra event on        # the queue, but who cares.        self._create_op(self.finalize)    def close_all(self):        failures = {}        self.free_handle_condition.acquire()        while self.get_open_file_count() > 0:            while len(self.open_file_to_handles) > 0:                filename, handle = self.open_file_to_handles.popitem()                try:                    handle.close()                except Exception, e:                    failures[self.file_to_torrent[filename]] = e                self.free_handle_condition.notify()            if self.get_open_file_count() > 0:                self.free_handle_condition.wait(1)        self.free_handle_condition.release()        for torrent, e in failures.iteritems():            torrent.got_exception(e)    def close_files(self, file_set):        failures = set()        self.free_handle_condition.acquire()        done = False        while not done:            filenames = list(self.open_file_to_handles.iterkeys())            for filename in filenames:                if filename not in file_set:                    continue                handles = self.open_file_to_handles.poprow(filename)                for handle in handles:                    try:                        handle.close()                    except Exception, e:                        failures.add(e)                    self.free_handle_condition.notify()            done = True            for filename in file_set.iterkeys():                if filename in self.active_file_to_handles:                    done = False                    break            if not done:                self.free_handle_condition.wait(0.5)        self.free_handle_condition.release()        if len(failures) > 0:            raise failures.pop()    def set_max_files_open(self, max_files_open):        if max_files_open <= 0:            max_files_open = 1e100        self.max_files_open = max_files_open        self.close_all()    def add_files(self, files, torrent):        for filename in files:            if filename in self.file_to_torrent:                raise BTFailure(_("File %s belongs to another running torrent")                                % filename)        for filename in files:            self.file_to_torrent[filename] = torrent    def remove_files(self, files):        for filename in files:            del self.file_to_torrent[filename]    def _ensure_exists(self, filename, length=0):        if not os.path.exists(filename):            f = os.path.split(filename)[0]            if f != '' and not os.path.exists(f):                os.makedirs(f)            f = file(filename, 'wb')            make_file_sparse(filename, f, length)            f.close()    def get_open_file_count(self):        t = self.open_file_to_handles.total_length()        t += self.active_file_to_handles.total_length()        return t    def acquire_handle(self, filename, for_write, length=0):        # this will block until a new file handle can be made        self.free_handle_condition.acquire()        # abort disk ops on unregistered files        if filename not in self.file_to_torrent:            self.free_handle_condition.release()            return None        while self.active_file_to_handles.total_length() == self.max_files_open:            self.free_handle_condition.wait()        if filename in self.open_file_to_handles:            handle = self.open_file_to_handles.pop_from_row(filename)            if for_write and not is_open_for_write(handle.mode):                handle.close()                handle = open_sparse_file(filename, 'rb+', length=length)            #elif not for_write and is_open_for_write(handle.mode):            #    handle.close()            #    handle = open_sparse_file(filename, 'rb', length=length)        else:            if self.get_open_file_count() == self.max_files_open:                oldfname, oldhandle = self.open_file_to_handles.popitem()                oldhandle.close()            self._ensure_exists(filename, length)            if for_write:                handle = open_sparse_file(filename, 'rb+', length=length)            else:                handle = open_sparse_file(filename, 'rb', length=length)        self.active_file_to_handles.push_to_row(filename, handle)        self.free_handle_condition.release()        return handle    def release_handle(self, filename, handle):        self.free_handle_condition.acquire()        self.active_file_to_handles.remove_fom_row(filename, handle)        self.open_file_to_handles.push_to_row(filename, handle)        self.free_handle_condition.notify()        self.free_handle_condition.release()    def _create_op(self, _f, *args, **kwargs):        df = Deferred()        self.diskq.put((df, _f, args, kwargs))        return df    read = _create_op    write = _create_op    def _disk_thread(self):        while not self.doneflag.isSet():            df, func, args, kwargs = self.diskq.get(True)            try:                v = func(*args, **kwargs)            except Exception, e:                print "DISK ERROR", e                import traceback                traceback.print_exc()            else:                self.external_add_task(0, df.callback, v)class Storage(object):    def __init__(self, config, filepool, save_path,                 files, add_task,                 external_add_task, doneflag):        self.filepool = filepool        self.config = config        self.doneflag = doneflag        self.add_task = add_task        self.external_add_task = external_add_task        self.initialize(save_path, files)    def initialize(self, save_path, files):        # a list of bytes ranges and filenames for window-based IO        self.ranges = []        # a dict of filename-to-ranges for piece priorities and filename lookup        self.range_by_name = {}        # a sparse set for smart allocation detection        self.allocated_regions = SparseSet()        # dict of filename-to-length on disk (for % complete in the file view)        self.undownloaded = {}        self.save_path = save_path        # Rather implement this as an ugly hack here than change all the        # individual calls. Affects all torrent instances using this module.        if self.config['bad_libc_workaround']:            bad_libc_workaround()        self.initialized = False        self.startup_df = ThreadedDeferred(_wrap_task(self.external_add_task),                                           self._build_file_structs,                                           self.filepool, files)        return self.startup_df    def _build_file_structs(self, filepool, files):        total = 0        for filename, length in files:            # we're shutting down, abort.            if self.doneflag.isSet():                return False            self.undownloaded[filename] = length            if length > 0:                self.ranges.append((total, total + length, filename))            self.range_by_name[filename] = (total, total + length)            if os.path.exists(filename):                if not os.path.isfile(filename):                    raise BTFailure(_("File %s already exists, but is not a "                                      "regular file") % filename)                l = os.path.getsize(filename)                if l > length:                    # This is the truncation Bram was talking about that no one                    # else thinks is a good idea.                    #h = file(filename, 'rb+')                    #make_file_sparse(filename, h, length)                    #h.truncate(length)                    #h.close()                    l = length                a = get_allocated_regions(filename, begin=0, length=l)                if a is not None:                    a.offset(total)                else:                    a = SparseSet()                    if l > 0:                        a.add(total, total + l)                self.allocated_regions += a            total += length        self.total_length = total        self.initialized = True        return True    def get_byte_range_for_filename(self, filename):        if filename not in self.range_by_name:            filename = os.path.normpath(filename)            filename = os.path.join(self.save_path, filename)        return self.range_by_name[filename]    def was_preallocated(self, pos, length):        return self.allocated_regions.is_range_in(pos, pos+length)    def get_total_length(self):        return self.total_length    def _intervals(self, pos, amount):        r = []        stop = pos + amount        p = max(bisect_right(self.ranges, (pos, 2 ** 500)) - 1, 0)        for begin, end, filename in self.ranges[p:]:            if begin >= stop:                break            r.append((filename, max(pos, begin) - begin, min(end, stop) - begin))        return r    def _read(self, filename, pos, amount):        begin, end = self.get_byte_range_for_filename(filename)        length = end - begin        h = self.filepool.acquire_handle(filename, for_write=False, length=length)        if h is None:            return        try:            h.seek(pos)            r = h.read(amount)        finally:            self.filepool.release_handle(filename, h)        return r    def _batch_read(self, pos, amount):        dfs = []        r = []        # queue all the reads        for filename, pos, end in self._intervals(pos, amount):            df = self.filepool.read(self._read, filename, pos, end - pos)            dfs.append(df)        # yield on all the reads in order - they complete in any order        for df in dfs:            yield df            r.append(df.getResult())        r = ''.join(r)        if len(r) != amount:            raise BTFailure(_("Short read (%d of %d) - something truncated files?") %                            (len(r), amount))        yield r    def read(self, pos, amount):        df = launch_coroutine(_wrap_task(self.add_task),                              self._batch_read, pos, amount)        return df    def _write(self, filename, pos, s):        begin, end = self.get_byte_range_for_filename(filename)        length = end - begin        h = self.filepool.acquire_handle(filename, for_write=True, length=length)        if h is None:            return        try:            h.seek(pos)            h.write(s)        finally:            self.filepool.release_handle(filename, h)        return len(s)    def _batch_write(self, pos, s):        dfs = []        total = 0        amount = len(s)        # queue all the writes        for filename, begin, end in self._intervals(pos, amount):            length = end - begin            d = buffer(s, total, length)            total += length            df = self.filepool.write(self._write, filename, begin, d)            dfs.append(df)        # yield on all the writes - they complete in any order        for df in dfs:            yield df            df.getResult()        yield total    def write(self, pos, s):        df = launch_coroutine(_wrap_task(self.add_task),                              self._batch_write, pos, s)        return df    def close(self):        if not self.initialized:            self.startup_df.addCallback(lambda *a : self.filepool.close_files(self.range_by_name))            return self.startup_df        self.filepool.close_files(self.range_by_name)    def downloaded(self, pos, length):        for filename, begin, end in self._intervals(pos, length):            self.undownloaded[filename] -= end - begin

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -