📄 pepeercontrolimpl.java
字号:
new_peer_transports.remove( peer );
peer_transports_cow = new_peer_transports;
removed = true;
}
}
finally{
peer_transports_mon.exit();
}
if( removed ) {
peer.closeConnection( reason );
peerRemoved( peer ); //notify listeners
}
else {
if ( log_if_not_found ){
Debug.out( "closeAndRemovePeer(): peer not removed" );
}
}
}
private void closeAndRemoveAllPeers( String reason, boolean reconnect ) {
ArrayList peer_transports;
try{
peer_transports_mon.enter();
peer_transports = peer_transports_cow;
peer_transports_cow = new ArrayList( 0 );
}
finally{
peer_transports_mon.exit();
}
for( int i=0; i < peer_transports.size(); i++ ) {
PEPeerTransport peer = (PEPeerTransport)peer_transports.get( i );
try{
peer.closeConnection( reason );
}catch( Throwable e ){
// if something goes wrong with the close process (there's a bug in there somewhere whereby
// we occasionally get NPEs then we want to make sure we carry on and close the rest
Debug.printStackTrace(e);
}
try{
peerRemoved( peer ); //notify listeners
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
if( reconnect ) {
for( int i=0; i < peer_transports.size(); i++ ) {
PEPeerTransport peer = (PEPeerTransport)peer_transports.get( i );
if( peer.getTCPListenPort() > 0 ) {
boolean use_crypto = peer.getPeerItemIdentity().getHandshakeType() == PeerItemFactory.HANDSHAKE_TYPE_CRYPTO;
PEPeerTransport new_conn = PEPeerTransportFactory.createTransport( this, peer.getPeerSource(), peer.getIp(), peer.getTCPListenPort(), use_crypto );
addToPeerTransports( new_conn );
}
}
}
}
public void addPeer( String ip_address, int port ) { //TODO do plugins need a way to force crypto???
PeerItem peer_item = PeerItemFactory.createPeerItem( ip_address, port, PeerItem.convertSourceID( PEPeerSource.PS_PLUGIN ), PeerItemFactory.HANDSHAKE_TYPE_PLAIN );
if( !isAlreadyConnected( peer_item ) ) {
boolean use_crypto = peer_item.getHandshakeType() == PeerItemFactory.HANDSHAKE_TYPE_CRYPTO;
boolean added = makeNewOutgoingConnection( PEPeerSource.PS_PLUGIN, ip_address, port, use_crypto ); //directly inject the the imported peer
if( !added ) Debug.out( "injected peer was not added" );
}
}
private void
addPeersFromTracker(
TRTrackerAnnouncerResponsePeer[] peers )
{
for (int i = 0; i < peers.length; i++){
TRTrackerAnnouncerResponsePeer peer = peers[i];
ArrayList peer_transports = peer_transports_cow;
boolean already_connected = false;
for( int x=0; x < peer_transports.size(); x++ ) {
PEPeerTransport transport = (PEPeerTransport)peer_transports.get( x );
// allow loopback connects for co-located proxy-based connections and testing
if( peer.getAddress().equals( transport.getIp() )){
boolean same_allowed = COConfigurationManager.getBooleanParameter( "Allow Same IP Peers" ) ||
transport.getIp().equals( "127.0.0.1" );
if( !same_allowed || peer.getPort() == transport.getPort() ) {
already_connected = true;
break;
}
}
}
if( already_connected ) continue;
if( peer_database != null ) {
int type = peer.getProtocol() == TRTrackerAnnouncerResponsePeer.PROTOCOL_CRYPT ? PeerItemFactory.HANDSHAKE_TYPE_CRYPTO : PeerItemFactory.HANDSHAKE_TYPE_PLAIN;
PeerItem item = PeerItemFactory.createPeerItem( peer.getAddress(), peer.getPort(), PeerItem.convertSourceID( peer.getSource() ), type );
peer_database.addDiscoveredPeer( item );
}
}
}
/**
* Request a new outgoing peer connection.
* @param address ip of remote peer
* @param port remote peer listen port
* @return true if the connection was added to the transport list, false if rejected
*/
private boolean
makeNewOutgoingConnection(
String peer_source,
String address,
int port,
boolean require_crypto )
{
//make sure this connection isn't filtered
if( ip_filter.isInRange( address, adapter.getDisplayName() ) ) {
return false;
}
//make sure we need a new connection
int needed = getMaxNewConnectionsAllowed();
if( needed == 0 ) return false;
//make sure not already connected to the same IP address; allow loopback connects for co-located proxy-based connections and testing
boolean same_allowed = COConfigurationManager.getBooleanParameter( "Allow Same IP Peers" ) || address.equals( "127.0.0.1" );
if( !same_allowed && PeerIdentityManager.containsIPAddress( _hash, address ) ){
return false;
}
if( PeerUtils.ignorePeerPort( port ) ) {
if (Logger.isEnabled())
Logger.log(new LogEvent(disk_mgr.getTorrent(), LOGID,
"Skipping connect with " + address + ":" + port
+ " as peer port is in ignore list."));
return false;
}
//start the connection
PEPeerTransport real = PEPeerTransportFactory.createTransport( this, peer_source, address, port, require_crypto );
addToPeerTransports( real );
return true;
}
/**
* A private method that checks if PEPieces being downloaded are finished
* If all blocks from a PEPiece are written to disk, this method will
* queue the piece for hash check.
* Elsewhere, if it passes sha-1 check, it will be marked as downloaded,
* otherwise, it will unmark it as fully downloaded, so blocks can be retreived again.
*/
private void checkCompletedPieces() {
//for every piece
for (int i = 0; i <_nbPieces; i++) {
final DiskManagerPiece dmPiece =dm_pieces[i];
//if piece is completly written, not already checking, and not Done
if (dmPiece.calcWritten() &&!dmPiece.isChecking() &&!dmPiece.isDone())
{
//check the piece from the disk
dmPiece.setChecking();
DiskManagerCheckRequest req =
disk_mgr.createCheckRequest(
i, new Integer(CHECK_REASON_DOWNLOADED));
req.setAdHoc( false );
disk_mgr.enqueueCheckRequest( req, this );
}
}
}
/** Checks given piece to see if it's active but empty, and if so deactivates it.
* @param pieceNumber to check
* @return true if the piece was removed and is no longer active (pePiece ==null)
*/
private boolean checkEmptyPiece(final int pieceNumber)
{
final PEPiece pePiece =pePieces[pieceNumber];
final DiskManagerPiece dmPiece =dm_pieces[pieceNumber];
if (pePiece ==null ||dmPiece.isRequested())
return false;
if (dmPiece.getNbWritten() >0 ||pePiece.getNbRequests() >0 ||pePiece.getSpeed() >0 ||pePiece.getReservedBy() !=null)
return false;
removePiece(pePiece, pieceNumber);
return true;
}
/**
* Check if a piece's Speed is too fast for it to be getting new data
* and if a reserved pieced failed to get data within 120 seconds
*/
private void checkSpeedAndReserved()
{
if ( (mainloop_loop_count %MAINLOOP_FIVE_SECOND_INTERVAL) != 0 ){
return;
}
final long now =SystemTime.getCurrentTime();
final int nbPieces =_nbPieces;
final PEPieceImpl[] pieces =pePieces;
//for every piece
for (int i =0; i <nbPieces; i++)
{
final PEPieceImpl pePiece =pieces[i];
// these checks are only against pieces being downloaded yet needing requests still/again
if (pePiece !=null)
{
final long timeSinceActivity =pePiece.getTimeSinceLastActivity();
if (timeSinceActivity >4 *1000)
{
final int oldSpeed =pePiece.getSpeed();
// maybe piece's speed is too high for it to get new data
if (oldSpeed >0)
{
final DiskManagerPiece dmPiece =dm_pieces[i];
if (dmPiece.isRequested() ||timeSinceActivity >29 *1000)
pePiece.setSpeed(0);
else
{
final long calcSpeed =((dmPiece.getNbWritten() *DiskManager.BLOCK_SIZE) /timeSinceActivity) -1;
if (calcSpeed <oldSpeed)
pePiece.setSpeed((int)(calcSpeed >0 ?calcSpeed :0));
}
} else if (timeSinceActivity >(120 *1000))
{
// has reserved piece gone stagnant?
final String reservingPeer =pePiece.getReservedBy();
if (reservingPeer !=null)
{
if (needsMD5CheckOnCompletion(i))
badPeerDetected(reservingPeer);
else
{
final PEPeerTransport pt =getTransportFromAddress(reservingPeer);
if (pt !=null)
closeAndRemovePeer(pt, "Reserved piece data timeout; 120 seconds", true);
}
pePiece.setReservedBy(null);
}
// if (!piecePicker.isInEndGameMode())
// pePiece.checkRequests();
checkEmptyPiece(i);
}
}
}
}
}
private void checkInterested()
{
if ( (mainloop_loop_count %MAINLOOP_ONE_SECOND_INTERVAL) != 0 ){
return;
}
if (lastNeededUndonePieceChange >=piecePicker.getNeededUndonePieceChange())
return;
lastNeededUndonePieceChange =piecePicker.getNeededUndonePieceChange();
final List peer_transports =peer_transports_cow;
for (int i =0; i <peer_transports.size(); i++)
{
PEPeerTransport peer =(PEPeerTransport)peer_transports.get(i);
peer.checkInterested();
}
}
/**
* Private method to process the results given by DiskManager's
* piece checking thread via asyncPieceChecked(..)
*/
private void
processPieceChecks()
{
if ( piece_check_result_list.size() > 0 ){
List pieces;
// process complete piece results
try{
piece_check_result_list_mon.enter();
pieces = new ArrayList( piece_check_result_list );
piece_check_result_list.clear();
}finally{
piece_check_result_list_mon.exit();
}
Iterator it = pieces.iterator();
while (it.hasNext()) {
Object[] data = (Object[])it.next();
processPieceCheckResult((DiskManagerCheckRequest)data[0],((Integer)data[1]).intValue());
}
}
}
protected void
checkRescan()
{
if ( rescan_piece_time == 0 ){
// pending a piece completion
return;
}
if ( next_rescan_piece == -1 ){
if ( mainloop_loop_count % MAINLOOP_FIVE_SECOND_INTERVAL == 0 ){
if ( adapter.isPeriodicRescanEnabled()){
next_rescan_piece = 0;
}
}
}else{
if ( mainloop_loop_count % MAINLOOP_TEN_MINUTE_INTERVAL == 0 ){
if ( !adapter.isPeriodicRescanEnabled()){
next_rescan_piece = -1;
}
}
}
if ( next_rescan_piece == -1 ){
return;
}
// delay as required
long now = SystemTime.getCurrentTime();
if ( rescan_piece_time > now ){
rescan_piece_time = now;
}
// 250K/sec limit
long piece_size = disk_mgr.getPieceLength();
long millis_per_piece = piece_size / 250;
if ( now - rescan_piece_time < millis_per_piece ){
return;
}
while( next_rescan_piece != -1 ){
int this_piece = next_rescan_piece;
next_rescan_piece++;
if ( next_rescan_piece == _nbPieces ){
next_rescan_piece = -1;
}
if ( pePieces[this_piece] == null && !dm_pieces[this_piece].isDone()){
DiskManagerCheckRequest req =
disk_mgr.createCheckRequest(
this_piece,
new Integer( CHECK_REASON_SCAN ));
if ( Logger.isEnabled()){
Logger.log(
new LogEvent(
disk_mgr.getTorrent(), LOGID,
"Rescanning piece " + this_piece ));
}
rescan_piece_time = 0; // mark as check piece in process
try{
disk_mgr.enqueueCheckRequest( req, this );
}catch( Throwable e ){
rescan_piece_time = now;
Debug.printStackTrace(e);
}
break;
}
}
}
/**
* This method checks if the downloading process is finished.
*
*/
private void checkFinished(boolean start_of_day)
{
boolean all_pieces_done =disk_mgr.getRemaining() ==0;
if (all_pieces_done)
{
seeding_mode =true;
piecePicker.clearEndGameChunks();
if (!start_of_day)
adapter.setStateFinishing();
_timeFinished =SystemTime.getCurrentTime();
List peer_transports =peer_transports_cow;
// remove previous snubbing
for (int i =0; i <peer_transports.size(); i++ )
{
PEPeerTransport pc =(PEPeerTransport) peer_transports.get(i);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -