📄 pepeertransportprotocol.java
字号:
message_limiter = new PeerMessageLimiter();
//link in outgoing piece handler
outgoing_piece_message_handler = new OutgoingBTPieceMessageHandler(diskManager, connection.getOutgoingMessageQueue() );
//link in outgoing have message aggregator
outgoing_have_message_aggregator = new OutgoingBTHaveMessageAggregator( connection.getOutgoingMessageQueue() );
connection_established_time = SystemTime.getCurrentTime();
connection_state = PEPeerTransport.CONNECTION_WAITING_FOR_HANDSHAKE;
changePeerState( PEPeer.HANDSHAKING );
registerForMessageHandling();
}
public String
getPeerSource()
{
return( peer_source );
}
/**
* Close the peer connection from within the PEPeerTransport object.
* @param reason
*/
protected void closeConnectionInternally( String reason, boolean connect_failed ) {
performClose( reason, connect_failed, false );
}
protected void closeConnectionInternally( String reason ) {
performClose( reason, false, false );
}
/**
* Close the peer connection from the PEPeerControl manager side.
* NOTE: This method assumes PEPeerControl already knows about the close.
* This method is inteded to be only invoked by select administrative methods.
* You probably should not invoke this directly.
*/
public void closeConnection( String reason ) {
performClose( reason, false, true );
}
private void performClose( String reason, boolean connect_failed, boolean externally_closed ) {
try{
closing_mon.enter();
if( closing ){
return;
}
closing = true;
// immediatly lose interest in peer
interested_in_other_peer =false;
lastNeededUndonePieceChange =Long.MAX_VALUE;
if (isSnubbed())
manager.decNbPeersSnubbed();
if( identityAdded ) { //remove identity
if( peer_id != null ) {
PeerIdentityManager.removeIdentity( manager.getPeerIdentityDataID(), peer_id, getPort());
}
else {
Debug.out( "PeerIdentity added but peer_id == null !!!" );
}
identityAdded = false;
}
changePeerState( PEPeer.CLOSING );
}finally{
closing_mon.exit();
}
//cancel any pending requests (on the manager side)
cancelRequests();
if( outgoing_have_message_aggregator != null ) {
outgoing_have_message_aggregator.destroy();
}
if( peer_exchange_item != null ) {
peer_exchange_item.destroy();
}
if( outgoing_piece_message_handler != null ) {
outgoing_piece_message_handler.destroy();
}
if( connection != null ) { //can be null if close is called within ::<init>::, like when the given port is invalid
connection.close();
}
if ( ip_resolver_request != null ){
ip_resolver_request.cancel();
}
removeAvailability();
changePeerState( PEPeer.DISCONNECTED );
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LOGID, "Peer connection closed: " + reason));
if( !externally_closed ) { //if closed internally, notify manager, otherwise we assume it already knows
manager.peerConnectionClosed( this, connect_failed );
}
}
private void addAvailability()
{
if (!availabilityAdded &&!closing &&peerHavePieces !=null &¤t_peer_state ==PEPeer.TRANSFERING)
{
final List peer_listeners_ref =peer_listeners_cow;
if (peer_listeners_ref !=null)
{
for (int i =0; i <peer_listeners_ref.size(); i++)
{
final PEPeerListener peerListener =(PEPeerListener) peer_listeners_ref.get(i);
peerListener.addAvailability(this, peerHavePieces);
}
availabilityAdded =true;
}
}
}
private void removeAvailability()
{
if (availabilityAdded &&peerHavePieces !=null)
{
final List peer_listeners_ref =peer_listeners_cow;
if (peer_listeners_ref !=null)
{
for (int i =0; i <peer_listeners_ref.size(); i++)
{
final PEPeerListener peerListener =(PEPeerListener) peer_listeners_ref.get(i);
peerListener.removeAvailability(this, peerHavePieces);
}
}
availabilityAdded =false;
}
peerHavePieces =null;
}
protected void sendBTHandshake() {
connection.getOutgoingMessageQueue().addMessage(
new BTHandshake( manager.getHash(),
manager.getPeerId(),
manager.isAZMessagingEnabled() ), false );
}
private void sendAZHandshake() {
final Message[] avail_msgs = MessageManager.getSingleton().getRegisteredMessages();
final String[] avail_ids = new String[ avail_msgs.length ];
final byte[] avail_vers = new byte[ avail_msgs.length ];
for( int i=0; i < avail_msgs.length; i++ ) {
avail_ids[i] = avail_msgs[i].getID();
avail_vers[i] = (byte)1; //NOTE: hack for ADV messaging transition
}
int local_tcp_port = TCPNetworkManager.getSingleton().getTCPListeningPortNumber();
int local_udp_port = UDPNetworkManager.getSingleton().getUDPListeningPortNumber();
int local_udp2_port = UDPNetworkManager.getSingleton().getUDPNonDataListeningPortNumber();
AZHandshake az_handshake = new AZHandshake(
AZPeerIdentityManager.getAZPeerIdentity(),
Constants.AZUREUS_NAME,
Constants.AZUREUS_VERSION,
local_tcp_port,
local_udp_port,
local_udp2_port,
avail_ids,
avail_vers,
NetworkManager.REQUIRE_CRYPTO_HANDSHAKE ? AZHandshake.HANDSHAKE_TYPE_CRYPTO : AZHandshake.HANDSHAKE_TYPE_PLAIN );
connection.getOutgoingMessageQueue().addMessage( az_handshake, false );
}
public int getPeerState() { return current_peer_state; }
public boolean isDownloadPossible()
{
if (!closing &&!choked_by_other_peer)
{
if (lastNeededUndonePieceChange <piecePicker.getNeededUndonePieceChange())
{
checkInterested();
lastNeededUndonePieceChange =piecePicker.getNeededUndonePieceChange();
}
if (interested_in_other_peer &¤t_peer_state ==PEPeer.TRANSFERING)
return true;
}
return false;
}
public int getPercentDoneInThousandNotation()
{
if (peerHavePieces ==null ||peerHavePieces.flags.length ==0)
return 0;
final long total_done;
if ( peerHavePieces.flags[nbPieces -1] ){
total_done = ((long)(peerHavePieces.nbSet -1) *diskManager.getPieceLength()) +diskManager.getLastPieceLength();
}else{
total_done =(long)peerHavePieces.nbSet *diskManager.getPieceLength();
}
return (int)((total_done *1000) /diskManager.getTotalLength());
}
public boolean transferAvailable() {
return (!choked_by_other_peer && interested_in_other_peer);
}
private void printRequestStats() {
if( SHOW_DISCARD_RATE_STATS ) {
final float discard_perc = (requests_discarded * 100F) / ((requests_completed + requests_recovered + requests_discarded) * 1F);
final float discard_perc_end = (requests_discarded_endgame * 100F) / ((requests_completed + requests_recovered + requests_discarded_endgame) * 1F);
final float recover_perc = (requests_recovered * 100F) / ((requests_recovered + requests_discarded) * 1F);
System.out.println( "c="+requests_completed+ " d="+requests_discarded+ " de="+requests_discarded_endgame+ " r="+requests_recovered+ " dp="+discard_perc+ "% dpe="+discard_perc_end+ "% rp="+recover_perc+ "%" );
}
}
/**
* Checks if this peer is a seed or not by trivially checking if
* thier Have bitflags exisits and shows a number of bits set equal
* to the torrent # of pieces (and the torrent # of pieces is >0)
*/
private void checkSeed()
{
// seed implicitly means *something* to send (right?)
if (peerHavePieces !=null &&nbPieces >0)
setSeed((peerHavePieces.nbSet ==nbPieces));
else
setSeed(false);
}
public DiskManagerReadRequest request(final int pieceNumber, final int pieceOffset, final int pieceLength) {
final DiskManagerReadRequest request =manager.createDiskManagerRequest(pieceNumber, pieceOffset, pieceLength);
if (current_peer_state != TRANSFERING) {
manager.requestCanceled(request);
return null;
}
boolean added =false;
try{
requested_mon.enter();
if (!requested.contains(request))
{
requested.add(request);
added =true;
}
}finally{
requested_mon.exit();
}
if (added)
{
connection.getOutgoingMessageQueue().addMessage( new BTRequest( pieceNumber, pieceOffset, pieceLength ), false );
_lastPiece =pieceNumber;
try{
recent_outgoing_requests_mon.enter();
recent_outgoing_requests.put( request, null );
}finally{
recent_outgoing_requests_mon.exit();
}
return request;
}
return null;
}
public int
getRequestIndex(
DiskManagerReadRequest request )
{
try{
requested_mon.enter();
return( requested.indexOf( request ));
}finally{
requested_mon.exit();
}
}
public void sendCancel( DiskManagerReadRequest request ) {
if ( current_peer_state != TRANSFERING ) return;
if ( hasBeenRequested( request ) ) {
removeRequest( request );
connection.getOutgoingMessageQueue().addMessage( new BTCancel( request.getPieceNumber(), request.getOffset(), request.getLength() ), false );
}
}
public void sendHave( int pieceNumber ) {
if ( current_peer_state != TRANSFERING ) return;
//only force if the other peer doesn't have this piece and is not yet interested
final boolean force =!other_peer_interested_in_me &&peerHavePieces !=null &&!peerHavePieces.flags[pieceNumber];
outgoing_have_message_aggregator.queueHaveMessage( pieceNumber, force );
checkInterested();
}
public void sendChoke() {
if ( current_peer_state != TRANSFERING ) return;
//System.out.println( "["+(System.currentTimeMillis()/1000)+"] " +connection + " choked");
outgoing_piece_message_handler.removeAllPieceRequests();
connection.getOutgoingMessageQueue().addMessage( new BTChoke(), false );
choking_other_peer = true;
is_optimistic_unchoke = false;
}
public void sendUnChoke() {
if ( current_peer_state != TRANSFERING ) return;
//System.out.println( "["+(System.currentTimeMillis()/1000)+"] " +connection + " unchoked");
choking_other_peer = false; // set this first as with pseudo peers we can effectively synchronously act
// on the unchoke advice and we don't want that borking with choked still set
connection.getOutgoingMessageQueue().addMessage( new BTUnchoke(), false );
}
private void sendKeepAlive() {
if ( current_peer_state != TRANSFERING ) return;
if( outgoing_have_message_aggregator.hasPending() ) {
outgoing_have_message_aggregator.forceSendOfPending();
}
else {
connection.getOutgoingMessageQueue().addMessage( new BTKeepAlive(), false );
}
}
/**
* Global checkInterested method.
* Early-out scan of pieces to determine if the peer is interesting or not.
* They're interesting if they have a piece that we Need and isn't Done
*/
public void checkInterested()
{
if (closing ||peerHavePieces ==null ||peerHavePieces.nbSet ==0)
return;
boolean is_interesting =false;
if (piecePicker.hasDownloadablePiece())
{ // there is a piece worth being interested in
if (!isSeed())
{ // check individually if don't have all
for (int i =peerHavePieces.start; i <=peerHavePieces.end; i++ )
{
if (peerHavePieces.flags[i] &&diskManager.isInteresting(i))
{
is_interesting =true;
break;
}
}
} else
is_interesting =true;
}
if (is_interesting &&!interested_in_other_peer)
connection.getOutgoingMessageQueue().addMessage(new BTInterested(), false);
else if (!is_interesting &&interested_in_other_peer)
connection.getOutgoingMessageQueue().addMessage(new BTUninterested(), false);
interested_in_other_peer =is_interesting;
}
/** @deprecated no longer used by CVS code
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -