📄 torrent.py
字号:
return df # this function is so nasty! def _initialize(self): self._doneflag = threading.Event() # only one torrent object for of a particular infohash at a time. # Note: This must be done after doneflag is created if shutdown() # is to be called from got_exception(). if self.config["one_download_per_torrent"]: self._mutex = NamedMutex(self.infohash.encode("hex")) if not self._mutex.acquire(False): try: raise UserFailure(_("Torrent already being downloaded or " "seeded." )) except UserFailure, e: # perform exception handling including shutting down # the torrent. self.got_exception(*sys.exc_info(), **{'cannot_shutdown':True}) return self.reported_port = self.config['forwarded_port'] if not self.reported_port: self.reported_port = \ self._singleport_listener.get_port(self.change_port) if self.reported_port: self.reserved_ports.append(self.reported_port) # backward compatibility with older 5.0 development versions if self.destination_path == "": try: self.destination_path = self.config['save_as'] except: pass if self.working_path == "": self.working_path = self.destination_path self._myid = self._make_id() random.seed(self._myid) self._build_url_mapping() self._urlage = URLage(self._urls) self._register_files() self.logger.debug("_start_download: self.working_path=%s", self.working_path) self._storage = Storage(self.config, self._filepool, self.working_path, zip(self._myfiles, self.metainfo.sizes), self.add_task, self.external_add_task, self._doneflag) df = self._storage.startup_df yield df if df.getResult() != True: # initialization was aborted return resumefile = None if self.data_dir: filename = os.path.join(self.data_dir, 'resume', self.infohash.encode('hex')) if os.path.exists(filename): try: resumefile = file(filename, 'rb') except Exception, e: self._error(logging.WARNING, _("Could not load fastresume data: %s") % unicode(e.args[0]) + ' ' + _("Will perform full hash check.")) if resumefile is not None: resumefile.close() resumefile = None def data_flunked(amount, index): self._ratemeasure.data_rejected(amount) self._error(logging.INFO, _("piece %d failed hash check, re-downloading it") % index) def errorfunc(level, text): def e(): self._error(level, text) self.external_add_task(0, e) def statusfunc(activity = None, fractionDone = 0): if activity is None: activity = self._activity[0] self._activity = (activity, fractionDone) self._storagewrapper = StorageWrapper(self._storage, self.config, self.metainfo.hashes, self.metainfo.piece_length, statusfunc, self._doneflag, data_flunked, self.infohash, errorfunc, self.working_path, self.destination_path, resumefile, self.add_task, self.external_add_task) df = self._storagewrapper.done_checking_df yield df if df.getResult() != True: # initialization was aborted return if resumefile is not None: resumefile.close() numpieces = len(self.metainfo.hashes) self._upmeasure = Measure(self.config['max_rate_period']) self._downmeasure = Measure(self.config['max_rate_period']) self._ratemeasure = RateMeasure(self._storagewrapper.amount_left_with_partials) self._picker = PiecePicker(self.config, numpieces, self._storagewrapper.have_set.iterneg(0, numpieces)) self._periodic_save_fastresume() while self._pending_file_priorities: self.set_file_priority(*self._pending_file_priorities.pop()) def kickpeer(connection): def kick(): connection.close() self.add_task(0, kick) def banpeer(ip): self._connection_manager.ban(ip) self.multidownload = MultiDownload(self.config, self._storagewrapper, self._urlage, self._picker, numpieces, self.finished, self._total_downmeasure, self._downmeasure, self._ratemeasure.data_came_in, kickpeer, banpeer) # HERE. Yipee! Uploads are created by callback while Download # objects are created by MultiDownload. --Dave def make_upload(connection): return Upload(connection, self._ratelimiter, self._upmeasure, self._choker, self._storagewrapper, self.config['max_slice_length'], self.config['max_rate_period'], self.config['num_fast'], self) if self._dht: addContact = self._dht.addContact else: addContact = None df = self.metainfo.get_tracker_ips(_wrap_task(self.external_add_task)) yield df tracker_ips = df.getResult() self._connection_manager = \ ConnectionManager(make_upload, self.multidownload, self._choker, numpieces, self._ratelimiter, self._rawserver, self.config, self._myid, self.add_task, self.infohash, self, addContact, 0, tracker_ips, self.log_root) self.multidownload.attach_connection_manager(self._connection_manager) self._statuscollector = TorrentStats(self.logger, self._choker, self.get_uprate, self.get_downrate, self._upmeasure.get_total, self._downmeasure.get_total, self._ratemeasure.get_time_left, self.get_percent_complete, self.multidownload.aggregate_piece_states, self.finflag, self._connection_manager, self.multidownload, self.get_file_priorities, self._myfiles, self._connection_manager.ever_got_incoming, None) self.state = "initialized" def start_download(self): assert self.state == "initialized" self.time_started = bttime() self._connection_manager.reopen(self.reported_port) self._singleport_listener.add_torrent(self.infohash, self._connection_manager) self._listening = True if self.metainfo.is_trackerless: if not self._dht: self._error(self, logging.CRITICAL, _("Attempt to download a trackerless torrent " "with trackerless client turned off.")) return else: if len(self._dht.table.findNodes(self.metainfo.infohash, invalid=False)) < const.K: for ip, port in self.metainfo.nodes: self._dht.addContact(ip, port) self._rerequest = DHTRerequester(self.config, self.add_task, self._connection_manager.how_many_connections, self._connection_manager.start_connection, self.external_add_task, self._rawserver, self._storagewrapper.get_amount_left, self._upmeasure.get_total, self._downmeasure.get_total, self.reported_port, self._myid, self.infohash, self._error, self.finflag, self._upmeasure.get_rate, self._downmeasure.get_rate, self._connection_manager.ever_got_incoming, self._no_announce_shutdown, self._announce_done, self._dht) else: self._rerequest = Rerequester(self.metainfo.announce, self.metainfo.announce_list, self.config, self.add_task, self.external_add_task, self._rawserver, self._connection_manager.how_many_connections, self._connection_manager.start_connection, self._storagewrapper.get_amount_left, self._upmeasure.get_total, self._downmeasure.get_total, self.reported_port, self._myid, self.infohash, self._error, self.finflag, self._upmeasure.get_rate, self._downmeasure.get_rate, self._connection_manager.ever_got_incoming, self._no_announce_shutdown, self._announce_done) self._statuscollector.rerequester = self._rerequest self.multidownload.rerequester = self._rerequest self._announced = True if self._dht and len(self._dht.table.findNodes(self.infohash)) == 0: self.add_task(5, self._dht.findCloseNodes) self._rerequest.begin() for url_prefix in self.metainfo.url_list: r = urlparse.urlparse(url_prefix) host = r[1] if ':' in host: host, port = host.split(':') port = int(port) else: port = 80 # TODO: async hostname resolution ip = socket.gethostbyname(host) self._connection_manager.start_http_connection((ip, port), url_prefix) self.state = "running" if not self.finflag.isSet(): self._activity = (_("downloading"), 0) self.feedback.started(self) if self._storagewrapper.amount_left == 0 and not self.completed: # By default, self.finished() resets the policy to "auto", # but if we discover on startup that we are already finished, # we don't want to reset it. # Also, if we discover on startup that we are already finished, # don't set finished_this_session. self.finished(policy=self.policy, finished_this_session=False) def stop_download(self, pause=False): assert self.state == "running" self.state = "initialized" if not self.finflag.isSet(): self._activity = (_("stopped"), 0) if self._announced: self._rerequest.announce_stop() self._announced = False self._statuscollector.rerequester = None self.multidownload.rerequester = None if self._listening: self._singleport_listener.remove_torrent(self.infohash) self._listening = False for port in self.reserved_ports: self._singleport_listener.release_port(port, self.change_port) del self.reserved_ports[:] if self._connection_manager is not None: if pause: self._connection_manager.throttle_connections() else: self._connection_manager.close_connections() if self.config['check_hashes']: self._save_fastresume() def shutdown(self): # use _rawserver.add_task directly here, because we want the callbacks # to happen even though _shutdown is about to invalidate this torrent's # context df = launch_coroutine(_wrap_task(self._rawserver.add_task), self._shutdown) df.addErrback(lambda e : self.got_exception(*e, **{'cannot_shutdown': True})) return df def _shutdown(self): self._doneflag.set() if self.state == "running": self.stop_download() # above is the last thing to set. if self._storagewrapper is not None: df = self._storagewrapper.done_checking_df yield df df.getResult() if self._storage is not None: df = self._storage.close() if df is not None: yield df df.getResult() self._unregister_files() self.context_valid = False self.state = "created" # release mutex on this torrent. if self.config["one_download_per_torrent"]: if self._mutex is not None and self._mutex.owner(): self._mutex.release() self._rawserver.add_task(0, gc.collect) def _no_announce_shutdown(self, level, text): # This is only called when announce fails with no peers, # don't try to announce again telling we're leaving the torrent self._announced = False self._error(level, text) self.failed() def set_file_priority(self, filename, priority): if self._storagewrapper is None or self._picker is None: self._pending_file_priorities.append((filename, priority)) else: begin, end = self._storagewrapper.get_piece_range_for_filename(filename) self._picker.set_priority(xrange(begin, end + 1), priority) self.config.setdefault('file_priorities', {}) self.config['file_priorities'][filename] = priority self._dump_torrent_config() def get_file_priorities(self): return self.config.get('file_priorities', {}) def get_file_priority(self, filename): fp = self.get_file_priorities() return fp.get(filename, 0) def add_feedback(self, feedback): self.feedback.chain.append(feedback) def remove_feedback(self, feedback): self.feedback.chain.remove(feedback) def got_exception(self, type, e, stack, cannot_shutdown=False): severity = logging.CRITICAL msg = "Torrent got exception: %s" % type try: e_str = unicode(e.args[0])
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -