📄 connectionmanager.py
字号:
if dns in self.pending_connections: return True kw['log_prefix'] = self.log_prefix h = handler(self, id, *a, **kw) self.pending_connections[dns] = h connector = self.rawserver.start_connection(dns, h, self.context) h.connector = connector return True def connection_completed(self, c): self.complete_connections.add(c) c.upload = self.make_upload(c) c.download = self.downloader.make_download(c) self.choker.connection_made(c) if c.uses_dht: c.send_port(self.reported_port) def got_port(self, c): if self.addcontact and c.uses_dht and c.dht_port != None: self.addcontact(c.connection.ip, c.dht_port) def ever_got_incoming(self): return self.everinc def how_many_connections(self): return len(self.complete_connections) def replace_connection(self): while self.spares: k, v = self.spares.popitem() dns, id = k handler, a, kw = v started = self._start_connection(dns, id, handler, *a, **kw) if not started: # start_connection decided to push this connection back on to # self.spares because a limit was hit. break now or loop # forever break def throttle_connections(self): self.throttled = True for c in self.connections.itervalues(): c.connection.pause_reading() def _check_throttle(self): if not self.context.is_context_valid(): self.cruise_control.stop() return self.downloader.check_rate() # TODO: this is a little lazy. it loops over all the connections every # half second for every torrent, even if there's no need to throttle. if not self.throttled: for c in self.connections.itervalues(): c.connection.resume_reading() # arg. resume actually flushes the buffers in iocpreactor, so we # have to check the state constantly if self.throttled: break def unthrottle_connections(self): self.throttled = False def singleport_connection(self, con): if con.ip in self.banned: return False m = self.config['max_allow_in'] if m and len(self.connections) >= m and con.ip not in self.tracker_ips: return False self._add_connection(con) con.set_parent(self) con.connection.context = self.context return True def _add_connection(self, con): self.connections[con.connection] = con if self.closed: con.connection.close() return if self.throttled: con.connection.pause_reading() def ban(self, ip): self.banned.add(ip)class SingleportListener(Handler): """Manages a server socket common to all torrents. When a remote peer opens a connection to the local peer, the SingleportListener maps that peer on to the appropriate torrent's connection manager (see SingleportListener.select_torrent). See Connector.Connection which upcalls to select_torrent after the infohash is received in the opening handshake.""" def __init__(self, rawserver, nattraverser, log_prefix): self.rawserver = rawserver self.nattraverser = nattraverser self.port = 0 self.ports = {} self.port_change_notification = None self.torrents = {} self.obfuscated_torrents = {} self.connections = {} self.download_id = None self.local_discovery = None self._creating_local_discorvery = False self.log_prefix = log_prefix def _close(self, port): serversocket = self.ports[port][0] self.nattraverser.unregister_port(port, "TCP") self.rawserver.stop_listening(serversocket) serversocket.close() if self.local_discovery: self.local_discovery.stop() def _check_close(self, port): if not port or self.port == port or len(self.ports[port][1]) > 0: return self._close(port) del self.ports[port] def open_port(self, port, config): if port in self.ports: self.port = port return serversocket = self.rawserver.create_serversocket( port, config['bind'], tos=config['peer_socket_tos']) try: d = self.nattraverser.register_port(port, port, "TCP", config['bind']) def change(*a): self.rawserver.external_add_task(0, self._change_port, *a) d.addCallback(change) def silent(*e): pass d.addErrback(silent) except Exception, e: # blanket, just incase - we don't want to interrupt things # maybe we should log it, maybe not #print "UPnP registration error", e pass self.rawserver.start_listening(serversocket, self) oldport = self.port self.port = port self.ports[port] = [serversocket, {}] self._check_close(oldport) if self.local_discovery: self.local_discovery.stop() self._create_local_discovery() def _create_local_discovery(self): self._creating_local_discorvery = True try: self.local_discovery = LocalDiscovery(self.rawserver, self.port, self._start_connection) self._creating_local_discorvery = False except: self.rawserver.add_task(5, self._create_local_discovery) def _start_connection(self, addr, infohash): infohash = infohash.decode('hex') if infohash not in self.torrents: return connection_manager = self.torrents[infohash] # TODO: peer id? connection_manager.start_connection(addr, None) def _change_port(self, port): if self.port == port: return [serversocket, callbacks] = self.ports[self.port] self.ports[port] = [serversocket, callbacks] del self.ports[self.port] self.port = port for callback in callbacks: if callback: callback(port) def get_port(self, callback = None): if self.port: callbacks = self.ports[self.port][1] if callback not in callbacks: callbacks[callback] = 1 else: callbacks[callback] += 1 return self.port def release_port(self, port, callback = None): callbacks = self.ports[port][1] callbacks[callback] -= 1 if callbacks[callback] == 0: del callbacks[callback] self._check_close(port) def close_sockets(self): for port in self.ports.iterkeys(): self._close(port) def add_torrent(self, infohash, connection_manager): if infohash in self.torrents: raise BTFailure(_("Can't start two separate instances of the same " "torrent")) self.torrents[infohash] = connection_manager self.obfuscated_torrents[sha('req2' + infohash).digest()] = \ connection_manager if self.local_discovery: self.local_discovery.announce(infohash.encode('hex'), connection_manager.my_id.encode('hex')) def remove_torrent(self, infohash): del self.torrents[infohash] del self.obfuscated_torrents[sha('req2' + infohash).digest()] def select_torrent(self, conn, infohash): if infohash in self.torrents: accepted = self.torrents[infohash].singleport_connection(conn) # the connection manager may refuse the connection, in which # case keep the connection in our list until it is dropped if accepted: del self.connections[conn.connection] def select_torrent_obfuscated(self, conn, streamid): if streamid not in self.obfuscated_torrents: return self.obfuscated_torrents[streamid].singleport_connection(conn) def connection_made(self, connection): con = Connection(self, connection, None, False, log_prefix=self.log_prefix) self.connections[connection] = con def replace_connection(self): pass def remove_dns_from_cache(self, dns): # since this was incoming, we don't cache the peer anyway pass
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -