📄 pepeertransportprotocol.java
字号:
}finally{
closing_mon.exit();
}
if( outgoing_have_message_aggregator != null ) {
outgoing_have_message_aggregator.destroy();
}
if( peer_exchange_item != null ) {
peer_exchange_item.destroy();
}
if( outgoing_piece_message_handler != null ) {
outgoing_piece_message_handler.destroy();
}
if( connection != null ) { //can be null if close is called within ::<init>::, like when the given port is invalid
connection.close();
}
//cancel any pending requests (on the manager side)
cancelRequests();
if ( ip_resolver_request != null ){
ip_resolver_request.cancel();
}
removeAvailability();
changePeerState( PEPeer.DISCONNECTED );
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LOGID, "Peer connection closed: " + reason));
if( !externally_closed ) { //if closed internally, notify manager, otherwise we assume it already knows
manager.peerConnectionClosed( this );
}
}
private void addAvailability()
{
if (!availabilityAdded &&!closing &&peerHavePieces !=null &¤t_peer_state ==PEPeer.TRANSFERING)
{
final List peer_listeners_ref =peer_listeners_cow;
if (peer_listeners_ref !=null)
{
for (int i =0; i <peer_listeners_ref.size(); i++)
{
final PEPeerListener peerListener =(PEPeerListener) peer_listeners_ref.get(i);
peerListener.addAvailability(this, peerHavePieces);
}
availabilityAdded =true;
}
}
}
private void removeAvailability()
{
if (availabilityAdded &&peerHavePieces !=null)
{
final List peer_listeners_ref =peer_listeners_cow;
if (peer_listeners_ref !=null)
{
for (int i =0; i <peer_listeners_ref.size(); i++)
{
final PEPeerListener peerListener =(PEPeerListener) peer_listeners_ref.get(i);
peerListener.removeAvailability(this, peerHavePieces);
}
}
availabilityAdded =false;
}
peerHavePieces =null;
}
protected void sendBTHandshake() {
connection.getOutgoingMessageQueue().addMessage(
new BTHandshake( manager.getHash(),
manager.getPeerId(),
manager.isAZMessagingEnabled() ), false );
}
private void sendAZHandshake() {
Message[] avail_msgs = MessageManager.getSingleton().getRegisteredMessages();
String[] avail_ids = new String[ avail_msgs.length ];
byte[] avail_vers = new byte[ avail_msgs.length ];
for( int i=0; i < avail_msgs.length; i++ ) {
avail_ids[i] = avail_msgs[i].getID();
avail_vers[i] = (byte)1; //NOTE: hack for ADV messaging transition
}
int local_udp_port = 0;
try{ //TODO udp port value should be in the core someday
PluginInterface dht_pi = AzureusCoreFactory.getSingleton().getPluginManager().getPluginInterfaceByClass( DHTPlugin.class );
// may not be present
if ( dht_pi != null ){
DHTPlugin dht = (DHTPlugin)dht_pi.getPlugin();
local_udp_port = dht.getPort();
}
}
catch( Throwable t ) {
Debug.out( "Exception while obtaining local udp listen port from DHTPlugin:", t );
}
AZHandshake az_handshake = new AZHandshake(
AZPeerIdentityManager.getAZPeerIdentity(),
Constants.AZUREUS_NAME,
Constants.AZUREUS_VERSION,
COConfigurationManager.getIntParameter( "TCP.Listen.Port" ),
local_udp_port,
avail_ids,
avail_vers,
NetworkManager.REQUIRE_CRYPTO_HANDSHAKE ? AZHandshake.HANDSHAKE_TYPE_CRYPTO : AZHandshake.HANDSHAKE_TYPE_PLAIN );
connection.getOutgoingMessageQueue().addMessage( az_handshake, false );
}
public int getPeerState() { return current_peer_state; }
public boolean isDownloadPossible()
{
if (!closing)
{
if (lastNeededUndonePieceChange <piecePicker.getNeededUndonePieceChange())
{
checkInterested();
lastNeededUndonePieceChange =piecePicker.getNeededUndonePieceChange();
}
if (interested_in_other_peer &&!choked_by_other_peer &¤t_peer_state ==PEPeer.TRANSFERING)
return true;
}
return false;
}
public int getPercentDoneInThousandNotation()
{
if (peerHavePieces ==null ||peerHavePieces.length <1)
return 0;
return (peerHavePieces.nbSet *1000) /peerHavePieces.length;
}
public boolean transferAvailable() {
return (!choked_by_other_peer && interested_in_other_peer);
}
private void printRequestStats() {
if( SHOW_DISCARD_RATE_STATS ) {
float discard_perc = (requests_discarded * 100F) / ((requests_completed + requests_recovered + requests_discarded) * 1F);
float discard_perc_end = (requests_discarded_endgame * 100F) / ((requests_completed + requests_recovered + requests_discarded_endgame) * 1F);
float recover_perc = (requests_recovered * 100F) / ((requests_recovered + requests_discarded) * 1F);
System.out.println( "c="+requests_completed+ " d="+requests_discarded+ " de="+requests_discarded_endgame+ " r="+requests_recovered+ " dp="+discard_perc+ "% dpe="+discard_perc_end+ "% rp="+recover_perc+ "%" );
}
}
/**
* Checks if this peer is a seed or not.
*/
private void checkSeed()
{
// seed implicitly means *something* to send (right?)
if (peerHavePieces !=null &&peerHavePieces.nbSet >0)
seed =(peerHavePieces.nbSet ==peerHavePieces.length);
else
seed =false;
}
public boolean request( int pieceNumber, int pieceOffset, int pieceLength) {
if (current_peer_state != TRANSFERING) {
manager.requestCanceled( manager.createDiskManagerRequest( pieceNumber, pieceOffset, pieceLength ) );
return false;
}
DiskManagerReadRequest request = manager.createDiskManagerRequest( pieceNumber, pieceOffset, pieceLength );
if( !hasBeenRequested( request ) ) {
addRequest( request );
try{
recent_outgoing_requests_mon.enter();
recent_outgoing_requests.put( request, null );
}finally{
recent_outgoing_requests_mon.exit();
}
connection.getOutgoingMessageQueue().addMessage( new BTRequest( pieceNumber, pieceOffset, pieceLength ), false );
return true;
}
return false;
}
public void sendCancel( DiskManagerReadRequest request ) {
if ( current_peer_state != TRANSFERING ) return;
if ( hasBeenRequested( request ) ) {
removeRequest( request );
connection.getOutgoingMessageQueue().addMessage( new BTCancel( request.getPieceNumber(), request.getOffset(), request.getLength() ), false );
}
}
public void sendHave( int pieceNumber ) {
if ( current_peer_state != TRANSFERING ) return;
//only force if the other peer doesn't have this piece and is not yet interested
final boolean force =!other_peer_interested_in_me &&peerHavePieces !=null &&!peerHavePieces.flags[pieceNumber];
outgoing_have_message_aggregator.queueHaveMessage( pieceNumber, force );
checkInterested();
}
public void sendChoke() {
if ( current_peer_state != TRANSFERING ) return;
//System.out.println( "["+(System.currentTimeMillis()/1000)+"] " +connection + " choked");
outgoing_piece_message_handler.removeAllPieceRequests();
connection.getOutgoingMessageQueue().addMessage( new BTChoke(), false );
choking_other_peer = true;
is_optimistic_unchoke = false;
}
public void sendUnChoke() {
if ( current_peer_state != TRANSFERING ) return;
//System.out.println( "["+(System.currentTimeMillis()/1000)+"] " +connection + " unchoked");
connection.getOutgoingMessageQueue().addMessage( new BTUnchoke(), false );
choking_other_peer = false;
}
private void sendKeepAlive() {
if ( current_peer_state != TRANSFERING ) return;
if( outgoing_have_message_aggregator.hasPending() ) {
outgoing_have_message_aggregator.forceSendOfPending();
}
else {
connection.getOutgoingMessageQueue().addMessage( new BTKeepAlive(), false );
}
}
/**
* Global checkInterested method.
* Early-out scan of pieces to determine if the peer is interesting or not.
* They're interesting if they have a piece that we need and isn't Done
*/
public void checkInterested()
{
if (closing ||peerHavePieces ==null ||peerHavePieces.nbSet ==0)
return;
boolean is_interesting =false;
if (piecePicker.hasDownloadablePiece())
{
if (!seed)
{
// there is a piece worth being interested in
for (int i =peerHavePieces.start; i <=peerHavePieces.end; i++ )
{
if (peerHavePieces.flags[i] && diskManager.isInteresting(i))
{
is_interesting =true;
break;
}
}
} else
is_interesting =true;
}
if (is_interesting &&!interested_in_other_peer)
connection.getOutgoingMessageQueue().addMessage(new BTInterested(), false);
else if (!is_interesting &&interested_in_other_peer)
connection.getOutgoingMessageQueue().addMessage(new BTUninterested(), false);
interested_in_other_peer =is_interesting;
}
/**
* Checks if a particular piece makes us interested in the peer
*
* @param pieceNumber
* the piece number that has been received
*/
private void checkInterested(int pieceNumber)
{
if (closing)
return;
// Do we need this piece and it's not Done?
boolean is_interesting =diskManager.isInteresting(pieceNumber);
if (is_interesting &&!interested_in_other_peer)
connection.getOutgoingMessageQueue().addMessage(new BTInterested(), false);
else if (!is_interesting &&interested_in_other_peer)
connection.getOutgoingMessageQueue().addMessage(new BTUninterested(), false);
interested_in_other_peer =is_interesting;
}
/**
* Private method to send the bitfield.
*/
private void sendBitField()
{
if (closing)
return;
// In case we're in super seed mode, we don't send our bitfield
if (manager.isSuperSeedMode())
return;
ArrayList lazies =null;
// create bitfield
DirectByteBuffer buffer =DirectByteBufferPool.getBuffer(DirectByteBuffer.AL_MSG, (nbPieces +7) /8);
DiskManagerPiece[] pieces =diskManager.getPieces();
int bToSend =0;
int i =0;
for (; i <pieces.length; i++ )
{
if ((i %8) ==0)
bToSend =0;
bToSend =bToSend <<1;
if (pieces[i].isDone())
{
if (ENABLE_LAZY_BITFIELD)
{
if (i <8 ||i >=(pieces.length -(pieces.length %8)))
{ // first and last bytes
if (lazies ==null)
lazies =new ArrayList();
lazies.add(new Integer(i)); // send as a Have message instead
} else
bToSend +=1;
} else
bToSend +=1;
}
if ((i %8) ==7)
buffer.put(DirectByteBuffer.SS_BT, (byte) bToSend);
}
if ((i %8) !=0)
{
bToSend =bToSend <<(8 -(i %8));
buffer.put(DirectByteBuffer.SS_BT, (byte) bToSend);
}
buffer.flip(DirectByteBuffer.SS_BT);
connection.getOutgoingMessageQueue().addMessage(new BTBitfield(buffer), false);
if (lazies !=null)
{
for (int x =0; x <lazies.size(); x++ )
{
Integer num =(Integer) lazies.get(x);
connection.getOutgoingMessageQueue().addMessage(new BTHave(num.intValue()), false);
}
}
}
public byte[] getId() { return peer_id; }
public String getIp() { return ip; }
public int getPort() { return port; }
public int getTCPListenPort() { return tcp_listen_port; }
public int getUDPListenPort() { return udp_listen_port; }
public String getClient() { return client; }
public boolean isIncoming() { return incoming; }
public boolean isOptimisticUnchoke() { return is_optimistic_unchoke && !isChokedByMe(); }
public void setOptimisticUnchoke( boolean is_optimistic ) { is_optimistic_unchoke = is_optimistic; }
public PEPeerControl getControl() { return manager; }
public PEPeerManager getManager() { return manager; }
public PEPeerStats getStats() { return peer_stats; }
/**
* @return null if no bitfield has been recieved yet
* else returns BitFlags indicating what pieces the peer has
*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -