📄 pepeercontrolimpl.java
字号:
}
public void
processTrackerResponse(
TRTrackerAnnouncerResponse response )
{
// only process new peers if we're still running
if ( is_running ){
analyseTrackerResponse( response );
}
}
private void
addExtendedPeersFromTracker(
Map extensions )
{
final Map protocols = (Map)extensions.get("protocols");
if ( protocols != null ){
System.out.println( "PEPeerControl: tracker response contained protocol extensions");
final Iterator protocol_it = protocols.keySet().iterator();
while( protocol_it.hasNext()){
final String protocol_name = (String)protocol_it.next();
final Map protocol = (Map)protocols.get(protocol_name);
final List transports = PEPeerTransportFactory.createExtendedTransports( this, protocol_name, protocol );
for (int i=0;i<transports.size();i++){
final PEPeer transport = (PEPeer)transports.get(i);
addPeer( transport );
}
}
}
}
public List
getPeers()
{
return( peer_transports_cow );
}
public List
getPeers(
String address )
{
List result = new ArrayList();
Iterator it = peer_transports_cow.iterator();
while( it.hasNext()){
PEPeerTransport peer = (PEPeerTransport)it.next();
if ( peer.getIp().equals( address )){
result.add( peer );
}
}
return( result );
}
public PeerDescriptor[]
getPendingPeers(
String address )
{
return((PeerDescriptor[])peer_database.getDiscoveredPeers());
}
public void
addPeer(
PEPeer _transport )
{
if ( !( _transport instanceof PEPeerTransport )){
throw( new RuntimeException("invalid class"));
}
final PEPeerTransport transport = (PEPeerTransport)_transport;
if (!ip_filter.isInRange(transport.getIp(), adapter.getDisplayName())) {
final ArrayList peer_transports = peer_transports_cow;
if ( !peer_transports.contains(transport)){
addToPeerTransports( transport );
}else{
Debug.out( "addPeer():: peer_transports.contains(transport): SHOULD NEVER HAPPEN !" );
transport.closeConnection( "already connected" );
}
}else{
transport.closeConnection( "IP address blocked by filters" );
}
}
public void
removePeer(
PEPeer _transport )
{
removePeer( _transport, "remove peer" );
}
public void
removePeer(
PEPeer _transport,
String reason )
{
if ( !( _transport instanceof PEPeerTransport )){
throw( new RuntimeException("invalid class"));
}
PEPeerTransport transport = (PEPeerTransport)_transport;
closeAndRemovePeer( transport, reason, true );
}
private void closeAndRemovePeer( PEPeerTransport peer, String reason, boolean log_if_not_found ) {
boolean removed = false;
// copy-on-write semantics
try{
peer_transports_mon.enter();
if ( peer_transports_cow.contains( peer )){
final ArrayList new_peer_transports = new ArrayList( peer_transports_cow );
new_peer_transports.remove( peer );
peer_transports_cow = new_peer_transports;
removed = true;
}
}
finally{
peer_transports_mon.exit();
}
if( removed ) {
peer.closeConnection( reason );
peerRemoved( peer ); //notify listeners
}
else {
if ( log_if_not_found ){
// we know this happens due to timing issues... Debug.out( "closeAndRemovePeer(): peer not removed" );
}
}
}
private void closeAndRemoveAllPeers( String reason, boolean reconnect ) {
ArrayList peer_transports;
try{
peer_transports_mon.enter();
peer_transports = peer_transports_cow;
peer_transports_cow = new ArrayList( 0 );
}
finally{
peer_transports_mon.exit();
}
for( int i=0; i < peer_transports.size(); i++ ) {
final PEPeerTransport peer = (PEPeerTransport)peer_transports.get( i );
try{
peer.closeConnection( reason );
}catch( Throwable e ){
// if something goes wrong with the close process (there's a bug in there somewhere whereby
// we occasionally get NPEs then we want to make sure we carry on and close the rest
Debug.printStackTrace(e);
}
try{
peerRemoved( peer ); //notify listeners
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
if( reconnect ) {
for( int i=0; i < peer_transports.size(); i++ ) {
final PEPeerTransport peer = (PEPeerTransport)peer_transports.get( i );
PEPeerTransport reconnected_peer = peer.reconnect();
if ( reconnected_peer != null ){
addToPeerTransports( reconnected_peer );
}
}
}
}
public void
addPeer(
String ip_address,
int tcp_port,
int udp_port,
boolean use_crypto )
{
final byte type = use_crypto ? PeerItemFactory.HANDSHAKE_TYPE_CRYPTO : PeerItemFactory.HANDSHAKE_TYPE_PLAIN;
final PeerItem peer_item = PeerItemFactory.createPeerItem( ip_address, tcp_port, PeerItem.convertSourceID( PEPeerSource.PS_PLUGIN ), type, udp_port, PeerItemFactory.CRYPTO_LEVEL_1, 0 );
byte crypto_level = PeerItemFactory.CRYPTO_LEVEL_1;
if( !isAlreadyConnected( peer_item ) ) {
String fail_reason;
if ( TCPNetworkManager.TCP_OUTGOING_ENABLED && tcp_port > 0){
fail_reason = makeNewOutgoingConnection( PEPeerSource.PS_PLUGIN, ip_address, tcp_port, udp_port, true, use_crypto, crypto_level ); //directly inject the the imported peer
}else if ( UDPNetworkManager.UDP_OUTGOING_ENABLED && udp_port > 0 ){
fail_reason = makeNewOutgoingConnection( PEPeerSource.PS_PLUGIN, ip_address, tcp_port, udp_port, false, use_crypto, crypto_level ); //directly inject the the imported peer
}else{
fail_reason = "No usable protocol";
}
if( fail_reason != null ) Debug.out( "injected peer was not added - " + fail_reason );
}
}
private void
addPeersFromTracker(
TRTrackerAnnouncerResponsePeer[] peers )
{
for (int i = 0; i < peers.length; i++){
final TRTrackerAnnouncerResponsePeer peer = peers[i];
final ArrayList peer_transports = peer_transports_cow;
boolean already_connected = false;
for( int x=0; x < peer_transports.size(); x++ ) {
final PEPeerTransport transport = (PEPeerTransport)peer_transports.get( x );
// allow loopback connects for co-located proxy-based connections and testing
if( peer.getAddress().equals( transport.getIp() )){
final boolean same_allowed = COConfigurationManager.getBooleanParameter( "Allow Same IP Peers" ) ||
transport.getIp().equals( "127.0.0.1" );
if( !same_allowed || peer.getPort() == transport.getPort() ) {
already_connected = true;
break;
}
}
}
if( already_connected ) continue;
if( peer_database != null ) {
byte type = peer.getProtocol() == DownloadAnnounceResultPeer.PROTOCOL_CRYPT ? PeerItemFactory.HANDSHAKE_TYPE_CRYPTO : PeerItemFactory.HANDSHAKE_TYPE_PLAIN;
byte crypto_level = peer.getAZVersion() < TRTrackerAnnouncer.AZ_TRACKER_VERSION_3?PeerItemFactory.CRYPTO_LEVEL_1:PeerItemFactory.CRYPTO_LEVEL_2;
PeerItem item = PeerItemFactory.createPeerItem(
peer.getAddress(),
peer.getPort(),
PeerItem.convertSourceID( peer.getSource() ),
type,
peer.getUDPPort(),
crypto_level,
peer.getUploadSpeed());
peer_database.addDiscoveredPeer( item );
}
int http_port = peer.getHTTPPort();
if ( http_port != 0 ){
adapter.addHTTPSeed( peer.getAddress(), http_port );
}
}
}
/**
* Request a new outgoing peer connection.
* @param address ip of remote peer
* @param port remote peer listen port
* @return null if the connection was added to the transport list, reason if rejected
*/
private String
makeNewOutgoingConnection(
String peer_source,
String address,
int tcp_port,
int udp_port,
boolean use_tcp,
boolean require_crypto,
byte crypto_level )
{
//make sure this connection isn't filtered
if( ip_filter.isInRange( address, adapter.getDisplayName() ) ) {
return "IPFilter block";
}
//make sure we need a new connection
final int needed = getMaxNewConnectionsAllowed();
if( needed == 0 ){
if ( peer_source != PEPeerSource.PS_PLUGIN ||
!doOptimisticDisconnect( AddressUtils.isLANLocalAddress( address ) != AddressUtils.LAN_LOCAL_NO)){
return "Too many connections";
}
}
//make sure not already connected to the same IP address; allow loopback connects for co-located proxy-based connections and testing
final boolean same_allowed = COConfigurationManager.getBooleanParameter( "Allow Same IP Peers" ) || address.equals( "127.0.0.1" );
if( !same_allowed && PeerIdentityManager.containsIPAddress( _hash, address ) ){
return "Already connected to IP";
}
if( PeerUtils.ignorePeerPort( tcp_port ) ) {
if (Logger.isEnabled())
Logger.log(new LogEvent(disk_mgr.getTorrent(), LOGID,
"Skipping connect with " + address + ":" + tcp_port
+ " as peer port is in ignore list."));
return "TCP port in ignore list";
}
//start the connection
PEPeerTransport real = PEPeerTransportFactory.createTransport( this, peer_source, address, tcp_port, udp_port, use_tcp, require_crypto, crypto_level );
addToPeerTransports( real );
return null;
}
/**
* A private method that checks if PEPieces being downloaded are finished
* If all blocks from a PEPiece are written to disk, this method will
* queue the piece for hash check.
* Elsewhere, if it passes sha-1 check, it will be marked as downloaded,
* otherwise, it will unmark it as fully downloaded, so blocks can be retreived again.
*/
private void checkCompletedPieces() {
if ((mainloop_loop_count %MAINLOOP_ONE_SECOND_INTERVAL) !=0)
return;
//for every piece
for (int i = 0; i <_nbPieces; i++) {
final DiskManagerPiece dmPiece =dm_pieces[i];
//if piece is completly written, not already checking, and not Done
if (dmPiece.isNeedsCheck())
{
//check the piece from the disk
dmPiece.setChecking();
DiskManagerCheckRequest req =
disk_mgr.createCheckRequest(
i, new Integer(CHECK_REASON_DOWNLOADED));
req.setAdHoc( false );
disk_mgr.enqueueCheckRequest( req, this );
}
}
}
/** Checks given piece to see if it's active but empty, and if so deactivates it.
* @param pieceNumber to check
* @return true if the piece was removed and is no longer active (pePiece ==null)
*/
private boolean checkEmptyPiece(final int pieceNumber)
{
if (piecePicker.isInEndGameMode())
return false; // be sure to not remove pieces in EGM
final PEPiece pePiece =pePieces[pieceNumber];
final DiskManagerPiece dmPiece =dm_pieces[pieceNumber];
if (pePiece == null || pePiece.isRequested())
return false;
if (dmPiece.getNbWritten() >0 ||pePiece.getNbRequests() >0 ||pePiece.getSpeed() >0 ||pePiece.getReservedBy() !=null)
return false;
removePiece(pePiece, pieceNumber);
return true;
}
/**
* Check if a piece's Speed is too fast for it to be getting new data
* and if a reserved pieced failed to get data within 120 seconds
*/
private void checkSpeedAndReserved()
{
if ( (mainloop_loop_count %MAINLOOP_FIVE_SECOND_INTERVAL) != 0 ){
return;
}
final int nbPieces =_nbPieces;
final PEPieceImpl[] pieces =pePieces;
//for every piece
for (int i =0; i <nbPieces; i++)
{
final PEPieceImpl pePiece =pieces[i];
// these checks are only against pieces being downloaded yet needing requests still/again
if (pePiece !=null)
{
final long timeSinceActivity =pePiece.getTimeSinceLastActivity();
if (timeSinceActivity >4 *1000)
{
final int oldSpeed =pePiece.getSpeed();
// maybe piece's speed is too high for it to get new data
if (oldSpeed >0)
{
final DiskManagerPiece dmPiece =dm_pieces[i];
if (pePiece.isRequested() ||timeSinceActivity >29 *1000)
pePiece.setSpeed(0);
else
{
final long calcSpeed =((dmPiece.getNbWritten() *DiskManager.BLOCK_SIZE) /timeSinceActivity) -1;
if (calcSpeed <oldSpeed)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -