⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 connectionmanager.py

📁 bittorrent source by python. please enjoy
💻 PY
📖 第 1 页 / 共 2 页
字号:
        if dns in self.pending_connections:            return True        kw['log_prefix'] = self.log_prefix        h = handler(self, id, *a, **kw)        self.pending_connections[dns] = h        connector = self.rawserver.start_connection(dns, h, self.context)        h.connector = connector        return True    def connection_completed(self, c):        self.complete_connections.add(c)        c.upload = self.make_upload(c)        c.download = self.downloader.make_download(c)        self.choker.connection_made(c)        if c.uses_dht:            c.send_port(self.reported_port)    def got_port(self, c):        if self.addcontact and c.uses_dht and c.dht_port != None:            self.addcontact(c.connection.ip, c.dht_port)    def ever_got_incoming(self):        return self.everinc    def how_many_connections(self):        return len(self.complete_connections)    def replace_connection(self):        while self.spares:            k, v = self.spares.popitem()            dns, id = k            handler, a, kw = v            started = self._start_connection(dns, id, handler, *a, **kw)            if not started:                # start_connection decided to push this connection back on to                # self.spares because a limit was hit. break now or loop                # forever                break    def throttle_connections(self):        self.throttled = True        for c in self.connections.itervalues():            c.connection.pause_reading()    def _check_throttle(self):        if not self.context.is_context_valid():            self.cruise_control.stop()            return                    self.downloader.check_rate()            # TODO: this is a little lazy. it loops over all the connections every        # half second for every torrent, even if there's no need to throttle.        if not self.throttled:            for c in self.connections.itervalues():                c.connection.resume_reading()                # arg. resume actually flushes the buffers in iocpreactor, so we                # have to check the state constantly                if self.throttled:                    break    def unthrottle_connections(self):        self.throttled = False            def singleport_connection(self, con):        if con.ip in self.banned:            return False        m = self.config['max_allow_in']        if m and len(self.connections) >= m and con.ip not in self.tracker_ips:            return False        self._add_connection(con)        con.set_parent(self)        con.connection.context = self.context        return True    def _add_connection(self, con):        self.connections[con.connection] = con        if self.closed:            con.connection.close()            return        if self.throttled:            con.connection.pause_reading()    def ban(self, ip):        self.banned.add(ip)class SingleportListener(Handler):    """Manages a server socket common to all torrents.  When a remote       peer opens a connection to the local peer, the SingleportListener       maps that peer on to the appropriate torrent's connection manager       (see SingleportListener.select_torrent).       See Connector.Connection which upcalls to select_torrent after       the infohash is received in the opening handshake."""    def __init__(self, rawserver, nattraverser, log_prefix):        self.rawserver = rawserver        self.nattraverser = nattraverser        self.port = 0        self.ports = {}        self.port_change_notification = None        self.torrents = {}        self.obfuscated_torrents = {}        self.connections = {}        self.download_id = None        self.local_discovery = None        self._creating_local_discorvery = False        self.log_prefix = log_prefix    def _close(self, port):                serversocket = self.ports[port][0]        self.nattraverser.unregister_port(port, "TCP")        self.rawserver.stop_listening(serversocket)        serversocket.close()        if self.local_discovery:            self.local_discovery.stop()    def _check_close(self, port):        if not port or self.port == port or len(self.ports[port][1]) > 0:            return        self._close(port)        del self.ports[port]    def open_port(self, port, config):        if port in self.ports:            self.port = port            return        serversocket = self.rawserver.create_serversocket(            port, config['bind'], tos=config['peer_socket_tos'])        try:            d = self.nattraverser.register_port(port, port, "TCP", config['bind'])            def change(*a):                self.rawserver.external_add_task(0, self._change_port, *a)            d.addCallback(change)            def silent(*e):                pass            d.addErrback(silent)        except Exception, e:            # blanket, just incase - we don't want to interrupt things            # maybe we should log it, maybe not            #print "UPnP registration error", e            pass        self.rawserver.start_listening(serversocket, self)        oldport = self.port        self.port = port        self.ports[port] = [serversocket, {}]                self._check_close(oldport)        if self.local_discovery:            self.local_discovery.stop()        self._create_local_discovery()    def _create_local_discovery(self):        self._creating_local_discorvery = True        try:            self.local_discovery = LocalDiscovery(self.rawserver, self.port,                                                  self._start_connection)            self._creating_local_discorvery = False        except:            self.rawserver.add_task(5, self._create_local_discovery)    def _start_connection(self, addr, infohash):        infohash = infohash.decode('hex')        if infohash not in self.torrents:            return        connection_manager = self.torrents[infohash]        # TODO: peer id?        connection_manager.start_connection(addr, None)            def _change_port(self, port):        if self.port == port:            return        [serversocket, callbacks] = self.ports[self.port]        self.ports[port] = [serversocket, callbacks]        del self.ports[self.port]        self.port = port        for callback in callbacks:            if callback:                callback(port)    def get_port(self, callback = None):        if self.port:            callbacks = self.ports[self.port][1]            if callback not in callbacks:                callbacks[callback] = 1            else:                callbacks[callback] += 1        return self.port    def release_port(self, port, callback = None):        callbacks = self.ports[port][1]        callbacks[callback] -= 1        if callbacks[callback] == 0:            del callbacks[callback]        self._check_close(port)    def close_sockets(self):        for port in self.ports.iterkeys():            self._close(port)    def add_torrent(self, infohash, connection_manager):        if infohash in self.torrents:            raise BTFailure(_("Can't start two separate instances of the same "                              "torrent"))        self.torrents[infohash] = connection_manager        self.obfuscated_torrents[sha('req2' + infohash).digest()] = \            connection_manager        if self.local_discovery:            self.local_discovery.announce(infohash.encode('hex'),                connection_manager.my_id.encode('hex'))    def remove_torrent(self, infohash):        del self.torrents[infohash]        del self.obfuscated_torrents[sha('req2' + infohash).digest()]    def select_torrent(self, conn, infohash):        if infohash in self.torrents:            accepted = self.torrents[infohash].singleport_connection(conn)            # the connection manager may refuse the connection, in which            # case keep the connection in our list until it is dropped            if accepted:                del self.connections[conn.connection]    def select_torrent_obfuscated(self, conn, streamid):        if streamid not in self.obfuscated_torrents:            return        self.obfuscated_torrents[streamid].singleport_connection(conn)    def connection_made(self, connection):        con = Connection(self, connection, None, False,                         log_prefix=self.log_prefix)        self.connections[connection] = con    def replace_connection(self):        pass    def remove_dns_from_cache(self, dns):        # since this was incoming, we don't cache the peer anyway        pass    

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -