📄 storage_iocp.py
字号:
# The contents of this file are subject to the BitTorrent Open Source License# Version 1.1 (the License). You may not copy or use this file, in either# source code or executable form, except in compliance with the License. You# may obtain a copy of the License at http://www.bittorrent.com/license/.## Software distributed under the License is distributed on an AS IS basis,# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License# for the specific language governing rights and limitations under the# License.# Written by Greg Hazelimport osimport win32filefrom bisect import bisect_rightfrom BitTorrent.translation import _from BitTorrent.obsoletepythonsupport import *from BitTorrent import BTFailure, app_namefrom BitTorrent.defer import Deferred, ThreadedDeferredfrom BitTorrent.yielddefer import launch_coroutine, _wrap_taskfrom BitTorrent.platform import get_allocated_regionsfrom BitTorrent.sparse_set import SparseSetfrom BitTorrent.DictWithLists import DictWithLists, DictWithSetsfrom BitTorrent.Storage_base import make_file_sparse, bad_libc_workaround, is_open_for_writefrom BitTorrent.Storage_base import open_sparse_file as open_sparse_file_base# not needed, but it raises errors for platforms that don't support iocpfrom twisted.internet.iocpreactor import _iocpfrom twisted.internet.iocpreactor.proactor import Proactorfrom twisted.internet import reactorassert isinstance(reactor, Proactor), "You imported twisted.internet.reactor before RawServer_twisted!"class OverlappedOp: def __init__(self): from twisted.internet import reactor self.reactor = reactor def ovDone(self, ret, bytes, (handle, buffer)): raise NotImplementedError def initiateOp(self): raise NotImplementedErrorclass ReadFileOp(OverlappedOp): def ovDone(self, ret, bytes, (handle, buffer)): df = self.df del self.df if ret or not bytes: df.errback(ret, bytes) else: df.callback(buffer[:bytes]) def initiateOp(self, handle, seekpos, buffer): df = Deferred() try: self.reactor.issueReadFile(handle, seekpos, buffer, self.ovDone, (handle, buffer)) except: df.errback(sys.exc_info()) else: self.df = df return dfclass WriteFileOp(OverlappedOp): def ovDone(self, ret, bytes, (handle, buffer)): df = self.df del self.df if ret or not bytes: df.errback(ret, bytes) else: df.callback(bytes) def initiateOp(self, handle, seekpos, buffer): assert len(buffer) > 0 assert seekpos >= 0 df = Deferred() try: self.reactor.issueWriteFile(handle, seekpos, buffer, self.ovDone, (handle, buffer)) except: df.errback(sys.exc_info()) else: self.df = df return dfclass IOCPFile(object): buffer_size = 16384 def __init__(self, handle): from twisted.internet import reactor self.reactor = reactor self.handle = handle self.osfhandle = win32file._get_osfhandle(self.handle.fileno()) self.mode = self.handle.mode # CloseHandle automatically calls CancelIo self.close = self.handle.close self.fileno = self.handle.fileno self.read_op = ReadFileOp() self.write_op = WriteFileOp() # standard block size by default self.readbuf = self.reactor.AllocateReadBuffer(self.buffer_size) def seek(self, offset): self.seekpos = offset def write(self, data): return self.write_op.initiateOp(self.osfhandle, self.seekpos, data) def read(self, bytes): if bytes == self.buffer_size: readbuf = self.readbuf else: # hmmmm, slow. but, readfile tries to fill the buffer, # so maybe this is better than reading too much all the time. readbuf = self.reactor.AllocateReadBuffer(bytes) return self.read_op.initiateOp(self.osfhandle, self.seekpos, readbuf)def open_sparse_file(path, mode, length=0, overlapped=True): return IOCPFile(open_sparse_file_base(path, mode, length, overlapped))class FilePool(object): def __init__(self, doneflag, add_task, external_add_task, max_files_open, num_disk_threads): self.add_task = add_task self.file_to_torrent = {} self.waiting_ops = [] self.active_file_to_handles = DictWithSets() self.open_file_to_handles = DictWithLists() self.set_max_files_open(max_files_open) def close_all(self): df = Deferred() self._close_all(df) return df def _close_all(self, df): failures = {} while len(self.open_file_to_handles) > 0: filename, handle = self.open_file_to_handles.popitem() try: handle.close() except: failures[self.file_to_torrent[filename]] = sys.exc_info() for torrent, e in failures.iteritems(): torrent.got_exception(e) if self.get_open_file_count() > 0: # it would be nice to wait on the deferred for the outstanding ops self.add_task(0.5, self._close_all, df) else: df.callback(True) def close_files(self, file_set): df = Deferred() self._close_files(df, file_set) return df def _close_files(self, df, file_set): exc_info = None done = False filenames = self.open_file_to_handles.keys() for filename in filenames: if filename not in file_set: continue handles = self.open_file_to_handles.poprow(filename) for handle in handles: try: handle.close() except: exc_info = sys.exc_info() done = True for filename in file_set.iterkeys(): if filename in self.active_file_to_handles: done = False break if exc_info is not None: df.errback(exc_info) if not done: # it would be nice to wait on the deferred for the outstanding ops self.add_task(0.5, self._close_files, df, file_set) else: df.callback(True) def set_max_files_open(self, max_files_open): if max_files_open <= 0: max_files_open = 1e100 self.max_files_open = max_files_open self.close_all() def add_files(self, files, torrent): for filename in files: if filename in self.file_to_torrent: raise BTFailure(_("File %s belongs to another running torrent") % filename) for filename in files: self.file_to_torrent[filename] = torrent def remove_files(self, files): for filename in files: del self.file_to_torrent[filename] def _ensure_exists(self, filename, length=0): if not os.path.exists(filename): f = os.path.split(filename)[0] if f != '' and not os.path.exists(f): os.makedirs(f) f = file(filename, 'wb') make_file_sparse(filename, f, length) f.close() def get_open_file_count(self): t = self.open_file_to_handles.total_length() t += self.active_file_to_handles.total_length() return t def free_handle_notify(self): if self.waiting_ops: args = self.waiting_ops.pop(0) self._produce_handle(*args) def acquire_handle(self, filename, for_write, length=0):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -