📄 connectionmanager.py
字号:
# The contents of this file are subject to the BitTorrent Open Source License# Version 1.1 (the License). You may not copy or use this file, in either# source code or executable form, except in compliance with the License. You# may obtain a copy of the License at http://www.bittorrent.com/license/.## Software distributed under the License is distributed on an AS IS basis,# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License# for the specific language governing rights and limitations under the# License.# Written by Bram Cohen and Greg Hazelfrom socket import error as socketerrorfrom BitTorrent.translation import _from BitTorrent import BTFailurefrom BitTorrent.hash import shafrom BitTorrent.obsoletepythonsupport import *from BitTorrent.RawServer_twisted import Handlerfrom BitTorrent.NatTraversal import UPNPErrorfrom BitTorrent.Connector import Connectionfrom BitTorrent.HTTPConnector import HTTPConnectionfrom BitTorrent.platform import is_frozen_exefrom BitTorrent.ClientIdentifier import identify_clientfrom BitTorrent.LocalDiscovery import LocalDiscoveryfrom BitTorrent.InternetWatcher import InternetSubscriberfrom BitTorrent.DictWithLists import OrderedDictfrom twisted.internet import taskimport randomimport logging#global_logger = logging.getLogger( "BitTorrent.ConnectionManager" )# header, reserved, download id, my id, [length, message]class GaurdedInitialConnection(Handler): def __init__(self, parent, id, encrypt=False, log_prefix="", lan=False): self.parent = parent self.id = id self.lan = lan self.log_prefix = log_prefix self.encrypt = encrypt self.connector = None def _make_connection(self, s): return Connection(self.parent, s, self.id, True, obfuscate_outgoing=self.encrypt, log_prefix=self.log_prefix, lan=self.lan) def connection_made(self, s): del self.parent.pending_connections[(s.ip, s.port)] con = self._make_connection(s) self.parent._add_connection(con) # it might not be obvious why this is here. # if the pending queue filled and put the remaining connections # into the spare list, this will push more connections in to pending self.parent.replace_connection() def connection_failed(self, addr, exception): del self.parent.pending_connections[addr] self.parent.replace_connection()class HTTPInitialConnection(GaurdedInitialConnection): def _make_connection(self, s): # ow! piece_size = self.parent.downloader.storage.piece_size urlage = self.parent.downloader.urlage return HTTPConnection(self.parent, piece_size, urlage, s, self.id, True) class ConnectionManager(InternetSubscriber): def __init__(self, make_upload, downloader, choker, numpieces, ratelimiter, rawserver, config, my_id, schedulefunc, download_id, context, addcontactfunc, reported_port, tracker_ips, log_prefix): """ @param my_id: my peer id. @param tracker_ips: list of tracker ip addresses. ConnectionManager does not drop connections from the tracker. This allows trackers to perform NAT checks even when there are max_allow_in connections. @param log_prefix: string used as the prefix for all log entries generated by the ConnectionManager and its created Connectors. """ self.make_upload = make_upload self.downloader = downloader self.choker = choker self.numpieces = numpieces self.ratelimiter = ratelimiter self.rawserver = rawserver self.my_id = my_id self.config = config self.schedulefunc = schedulefunc self.download_id = download_id self.context = context self.addcontact = addcontactfunc self.reported_port = reported_port self.everinc = False self.tracker_ips = tracker_ips self.log_prefix = log_prefix self.closed = False # submitted self.pending_connections = {} # transport connected self.connections = {} # protocol active self.complete_connections = set() self.spares = {} self.cached_peers = OrderedDict() self.cache_limit = 100 self.reopen(reported_port) self.banned = set() self.schedulefunc(config['keepalive_interval'], self.send_keepalives) self.throttled = False self.downloader.postpone_func = self.throttle_connections self.downloader.resume_func = self.unthrottle_connections self.cruise_control = task.LoopingCall(self._check_throttle) self.cruise_control.start(1.0) def reopen(self, port): self.closed = False self.reported_port = port self.unthrottle_connections() self.rawserver.internet_watcher.add_subscriber(self) def internet_active(self): for dns in self.cached_peers.iterkeys(): self._fire_cached_connection(dns) def remove_dns_from_cache(self, dns): # could have been an incoming connection # or could have been dropped by the cache limit if dns in self.cached_peers: del self.cached_peers[dns] def try_one_connection(self): keys = self.cached_peers.keys() if not keys: return False dns = random.choice(keys) self._fire_cached_connection(dns) return True def _fire_cached_connection(self, dns): (id, handler, a, kw) = self.cached_peers[dns] return self._start_connection(dns, id, handler, *a, **kw) def close_connections(self): self.rawserver.internet_watcher.remove_subscriber(self) self.closed = True pending = self.pending_connections.values() # drop connections which could be made after we're not interested for h in pending: h.connector.close() for c in self.connections.itervalues(): if not c.closed: c.connection.close() c.closed = True def send_keepalives(self): self.schedulefunc(self.config['keepalive_interval'], self.send_keepalives) for c in self.complete_connections: c.send_keepalive() def hashcheck_succeeded(self, i): for c in self.complete_connections: c.send_have(i) # returns False if the connection info has been pushed on to self.spares # other filters and a successful connection return True def start_connection(self, dns, id=None, encrypt=False, lan=False): """@param dns: domain name/ip address and port pair. @param id: peer id. """ return self._start_connection(dns, id, GaurdedInitialConnection, encrypt=encrypt, lan=lan) def start_http_connection(self, dns, id): return self._start_connection(dns, id, HTTPInitialConnection) def _start_connection(self, dns, id, handler, *a, **kw): """@param dns: domain name/ip address and port pair. @param id: peer id. """ if self.closed: return True if dns[0] in self.banned: return True if id == self.my_id: return True # store the first instance only if dns not in self.cached_peers: # obey the cache size limit if len(self.cached_peers) >= self.cache_limit: olddns = self.cached_peers.keys()[0] self.remove_dns_from_cache(olddns) self.cached_peers[dns] = (id, handler, a, kw) for v in self.connections.itervalues(): if id and v.id == id: return True if self.config['one_connection_per_ip'] and v.ip == dns[0]: return True #print "start", len(self.pending_connections), len(self.spares), len(self.connections) total_outstanding = len(self.connections) # it's possible the pending connections could eventually complete, # so we have to account for those when enforcing max_initiate total_outstanding += len(self.pending_connections) if total_outstanding >= self.config['max_initiate']: self.spares[(dns, id)] = (handler, a, kw) return False # if these fail, I'm getting a very weird dns object assert isinstance(dns, tuple) assert isinstance(dns[0], str) assert isinstance(dns[1], int) # sometimes we try to connect to a peer we're already trying to # connect to #assert dns not in self.pending_connections
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -