⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connectionmanager.py

📁 bittorrent source by python. please enjoy
💻 PY
📖 第 1 页 / 共 2 页
字号:
# The contents of this file are subject to the BitTorrent Open Source License# Version 1.1 (the License).  You may not copy or use this file, in either# source code or executable form, except in compliance with the License.  You# may obtain a copy of the License at http://www.bittorrent.com/license/.## Software distributed under the License is distributed on an AS IS basis,# WITHOUT WARRANTY OF ANY KIND, either express or implied.  See the License# for the specific language governing rights and limitations under the# License.# Written by Bram Cohen and Greg Hazelfrom socket import error as socketerrorfrom BitTorrent.translation import _from BitTorrent import BTFailurefrom BitTorrent.hash import shafrom BitTorrent.obsoletepythonsupport import *from BitTorrent.RawServer_twisted import Handlerfrom BitTorrent.NatTraversal import UPNPErrorfrom BitTorrent.Connector import Connectionfrom BitTorrent.HTTPConnector import HTTPConnectionfrom BitTorrent.platform import is_frozen_exefrom BitTorrent.ClientIdentifier import identify_clientfrom BitTorrent.LocalDiscovery import LocalDiscoveryfrom BitTorrent.InternetWatcher import InternetSubscriberfrom BitTorrent.DictWithLists import OrderedDictfrom twisted.internet import taskimport randomimport logging#global_logger = logging.getLogger( "BitTorrent.ConnectionManager" )# header, reserved, download id, my id, [length, message]class GaurdedInitialConnection(Handler):    def __init__(self, parent, id, encrypt=False, log_prefix="", lan=False):        self.parent = parent        self.id = id        self.lan = lan        self.log_prefix = log_prefix        self.encrypt = encrypt        self.connector = None    def _make_connection(self, s):        return Connection(self.parent, s, self.id, True,                          obfuscate_outgoing=self.encrypt,                          log_prefix=self.log_prefix,                          lan=self.lan)            def connection_made(self, s):        del self.parent.pending_connections[(s.ip, s.port)]        con = self._make_connection(s)        self.parent._add_connection(con)                    # it might not be obvious why this is here.        # if the pending queue filled and put the remaining connections        # into the spare list, this will push more connections in to pending        self.parent.replace_connection()            def connection_failed(self, addr, exception):        del self.parent.pending_connections[addr]        self.parent.replace_connection()class HTTPInitialConnection(GaurdedInitialConnection):    def _make_connection(self, s):        # ow!        piece_size = self.parent.downloader.storage.piece_size        urlage = self.parent.downloader.urlage        return HTTPConnection(self.parent, piece_size, urlage, s, self.id, True)    class ConnectionManager(InternetSubscriber):    def __init__(self, make_upload, downloader, choker,                 numpieces, ratelimiter,                 rawserver, config, my_id, schedulefunc, download_id, context,                 addcontactfunc, reported_port, tracker_ips, log_prefix):        """            @param my_id: my peer id.            @param tracker_ips: list of tracker ip addresses.               ConnectionManager does not drop connections from the tracker.               This allows trackers to perform NAT checks even when there               are max_allow_in connections.            @param log_prefix: string used as the prefix for all               log entries generated by the ConnectionManager and its               created Connectors.        """        self.make_upload = make_upload        self.downloader = downloader        self.choker = choker        self.numpieces = numpieces        self.ratelimiter = ratelimiter        self.rawserver = rawserver        self.my_id = my_id        self.config = config        self.schedulefunc = schedulefunc        self.download_id = download_id        self.context = context        self.addcontact = addcontactfunc        self.reported_port = reported_port        self.everinc = False        self.tracker_ips = tracker_ips        self.log_prefix = log_prefix        self.closed = False                # submitted        self.pending_connections = {}        # transport connected        self.connections = {}        # protocol active        self.complete_connections = set()                self.spares = {}        self.cached_peers = OrderedDict()        self.cache_limit = 100        self.reopen(reported_port)                self.banned = set()        self.schedulefunc(config['keepalive_interval'],                          self.send_keepalives)        self.throttled = False        self.downloader.postpone_func = self.throttle_connections        self.downloader.resume_func = self.unthrottle_connections        self.cruise_control = task.LoopingCall(self._check_throttle)        self.cruise_control.start(1.0)    def reopen(self, port):        self.closed = False        self.reported_port = port        self.unthrottle_connections()        self.rawserver.internet_watcher.add_subscriber(self)    def internet_active(self):        for dns in self.cached_peers.iterkeys():            self._fire_cached_connection(dns)    def remove_dns_from_cache(self, dns):        # could have been an incoming connection        # or could have been dropped by the cache limit        if dns in self.cached_peers:            del self.cached_peers[dns]    def try_one_connection(self):        keys = self.cached_peers.keys()        if not keys:            return False        dns = random.choice(keys)        self._fire_cached_connection(dns)        return True    def _fire_cached_connection(self, dns):        (id, handler, a, kw) = self.cached_peers[dns]        return self._start_connection(dns, id, handler, *a, **kw)            def close_connections(self):        self.rawserver.internet_watcher.remove_subscriber(self)        self.closed = True        pending = self.pending_connections.values()        # drop connections which could be made after we're not interested        for h in pending:            h.connector.close()                    for c in self.connections.itervalues():            if not c.closed:                c.connection.close()                c.closed = True    def send_keepalives(self):        self.schedulefunc(self.config['keepalive_interval'],                          self.send_keepalives)        for c in self.complete_connections:            c.send_keepalive()    def hashcheck_succeeded(self, i):        for c in self.complete_connections:            c.send_have(i)    # returns False if the connection info has been pushed on to self.spares    # other filters and a successful connection return True    def start_connection(self, dns, id=None, encrypt=False, lan=False):        """@param dns: domain name/ip address and port pair.           @param id: peer id.           """        return self._start_connection(dns, id, GaurdedInitialConnection,                                      encrypt=encrypt,                                      lan=lan)        def start_http_connection(self, dns, id):        return self._start_connection(dns, id, HTTPInitialConnection)    def _start_connection(self, dns, id, handler, *a, **kw):        """@param dns: domain name/ip address and port pair.           @param id: peer id.           """        if self.closed:            return True         if dns[0] in self.banned:            return True        if id == self.my_id:            return True        # store the first instance only        if dns not in self.cached_peers:            # obey the cache size limit            if len(self.cached_peers) >= self.cache_limit:                olddns = self.cached_peers.keys()[0]                self.remove_dns_from_cache(olddns)            self.cached_peers[dns] = (id, handler, a, kw)                for v in self.connections.itervalues():            if id and v.id == id:                return True            if self.config['one_connection_per_ip'] and v.ip == dns[0]:                return True        #print "start", len(self.pending_connections), len(self.spares), len(self.connections)        total_outstanding = len(self.connections)        # it's possible the pending connections could eventually complete,        # so we have to account for those when enforcing max_initiate        total_outstanding += len(self.pending_connections)                if total_outstanding >= self.config['max_initiate']:            self.spares[(dns, id)] = (handler, a, kw)            return False        # if these fail, I'm getting a very weird dns object                assert isinstance(dns, tuple)        assert isinstance(dns[0], str)        assert isinstance(dns[1], int)        # sometimes we try to connect to a peer we're already trying to         # connect to         #assert dns not in self.pending_connections

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -