⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 storage_iocp.py

📁 bittorrent source by python. please enjoy
💻 PY
📖 第 1 页 / 共 2 页
字号:
# The contents of this file are subject to the BitTorrent Open Source License# Version 1.1 (the License).  You may not copy or use this file, in either# source code or executable form, except in compliance with the License.  You# may obtain a copy of the License at http://www.bittorrent.com/license/.## Software distributed under the License is distributed on an AS IS basis,# WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License# for the specific language governing rights and limitations under the# License.# Written by Greg Hazelimport osimport win32filefrom bisect import bisect_rightfrom BitTorrent.translation import _from BitTorrent.obsoletepythonsupport import *from BitTorrent import BTFailure, app_namefrom BitTorrent.defer import Deferred, ThreadedDeferredfrom BitTorrent.yielddefer import launch_coroutine, _wrap_taskfrom BitTorrent.platform import get_allocated_regionsfrom BitTorrent.sparse_set import SparseSetfrom BitTorrent.DictWithLists import DictWithLists, DictWithSetsfrom BitTorrent.Storage_base import make_file_sparse, bad_libc_workaround, is_open_for_writefrom BitTorrent.Storage_base import open_sparse_file as open_sparse_file_base# not needed, but it raises errors for platforms that don't support iocpfrom twisted.internet.iocpreactor import _iocpfrom twisted.internet.iocpreactor.proactor import Proactorfrom twisted.internet import reactorassert isinstance(reactor, Proactor), "You imported twisted.internet.reactor before RawServer_twisted!"class OverlappedOp:    def __init__(self):        from twisted.internet import reactor        self.reactor = reactor    def ovDone(self, ret, bytes, (handle, buffer)):        raise NotImplementedError            def initiateOp(self):        raise NotImplementedErrorclass ReadFileOp(OverlappedOp):    def ovDone(self, ret, bytes, (handle, buffer)):        df = self.df        del self.df         if ret or not bytes:            df.errback(ret, bytes)        else:            df.callback(buffer[:bytes])    def initiateOp(self, handle, seekpos, buffer):        df = Deferred()        try:            self.reactor.issueReadFile(handle, seekpos, buffer,                                       self.ovDone, (handle, buffer))        except:            df.errback(sys.exc_info())        else:            self.df = df        return dfclass WriteFileOp(OverlappedOp):    def ovDone(self, ret, bytes, (handle, buffer)):        df = self.df        del self.df         if ret or not bytes:            df.errback(ret, bytes)        else:            df.callback(bytes)    def initiateOp(self, handle, seekpos, buffer):        assert len(buffer) > 0        assert seekpos >= 0        df = Deferred()        try:            self.reactor.issueWriteFile(handle, seekpos, buffer,                                        self.ovDone, (handle, buffer))        except:            df.errback(sys.exc_info())        else:            self.df = df        return dfclass IOCPFile(object):    buffer_size = 16384            def __init__(self, handle):        from twisted.internet import reactor        self.reactor = reactor        self.handle = handle        self.osfhandle = win32file._get_osfhandle(self.handle.fileno())        self.mode = self.handle.mode        # CloseHandle automatically calls CancelIo        self.close = self.handle.close        self.fileno = self.handle.fileno        self.read_op = ReadFileOp()        self.write_op = WriteFileOp()        # standard block size by default        self.readbuf = self.reactor.AllocateReadBuffer(self.buffer_size)    def seek(self, offset):        self.seekpos = offset        def write(self, data):        return self.write_op.initiateOp(self.osfhandle, self.seekpos, data)    def read(self, bytes):        if bytes == self.buffer_size:            readbuf = self.readbuf        else:                        # hmmmm, slow. but, readfile tries to fill the buffer,            # so maybe this is better than reading too much all the time.            readbuf = self.reactor.AllocateReadBuffer(bytes)                return self.read_op.initiateOp(self.osfhandle, self.seekpos, readbuf)def open_sparse_file(path, mode, length=0, overlapped=True):    return IOCPFile(open_sparse_file_base(path, mode, length, overlapped))class FilePool(object):    def __init__(self, doneflag, add_task, external_add_task, max_files_open, num_disk_threads):        self.add_task = add_task        self.file_to_torrent = {}        self.waiting_ops = []        self.active_file_to_handles = DictWithSets()        self.open_file_to_handles = DictWithLists()        self.set_max_files_open(max_files_open)    def close_all(self):        df = Deferred()        self._close_all(df)        return df    def _close_all(self, df):        failures = {}        while len(self.open_file_to_handles) > 0:            filename, handle = self.open_file_to_handles.popitem()            try:                handle.close()            except:                failures[self.file_to_torrent[filename]] = sys.exc_info()        for torrent, e in failures.iteritems():            torrent.got_exception(e)        if self.get_open_file_count() > 0:            # it would be nice to wait on the deferred for the outstanding ops            self.add_task(0.5, self._close_all, df)        else:            df.callback(True)    def close_files(self, file_set):        df = Deferred()        self._close_files(df, file_set)        return df    def _close_files(self, df, file_set):        exc_info = None        done = False        filenames = self.open_file_to_handles.keys()        for filename in filenames:            if filename not in file_set:                continue            handles = self.open_file_to_handles.poprow(filename)            for handle in handles:                try:                    handle.close()                except:                    exc_info = sys.exc_info()        done = True        for filename in file_set.iterkeys():            if filename in self.active_file_to_handles:                done = False                break        if exc_info is not None:            df.errback(exc_info)        if not done:            # it would be nice to wait on the deferred for the outstanding ops            self.add_task(0.5, self._close_files, df, file_set)        else:            df.callback(True)    def set_max_files_open(self, max_files_open):        if max_files_open <= 0:            max_files_open = 1e100        self.max_files_open = max_files_open        self.close_all()    def add_files(self, files, torrent):        for filename in files:            if filename in self.file_to_torrent:                raise BTFailure(_("File %s belongs to another running torrent")                                % filename)        for filename in files:            self.file_to_torrent[filename] = torrent    def remove_files(self, files):        for filename in files:            del self.file_to_torrent[filename]    def _ensure_exists(self, filename, length=0):        if not os.path.exists(filename):            f = os.path.split(filename)[0]            if f != '' and not os.path.exists(f):                os.makedirs(f)            f = file(filename, 'wb')            make_file_sparse(filename, f, length)            f.close()    def get_open_file_count(self):        t = self.open_file_to_handles.total_length()        t += self.active_file_to_handles.total_length()        return t    def free_handle_notify(self):        if self.waiting_ops:            args = self.waiting_ops.pop(0)            self._produce_handle(*args)    def acquire_handle(self, filename, for_write, length=0):

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -