📄 pepeertransportprotocol.java
字号:
}
field.get(DirectByteBuffer.SS_PEER, dataf);
try{
closing_mon.enter();
if (closing)
bitfield.destroy();
else
{
final BitFlags tempHavePieces;
if (peerHavePieces ==null)
{
tempHavePieces =new BitFlags(nbPieces);
} else
{
tempHavePieces =peerHavePieces;
removeAvailability();
}
for (int i =0; i <nbPieces; i++)
{
final int index =i /8;
final int bit =7 -(i %8);
final byte bData =dataf[index];
final byte b =(byte) (bData >>bit);
if ((b &0x01) ==1)
{
tempHavePieces.set(i);
manager.updateSuperSeedPiece(this, i);
}
}
bitfield.destroy();
peerHavePieces =tempHavePieces;
addAvailability();
checkSeed();
checkInterested();
}
}
finally{
closing_mon.exit();
}
}
protected void decodeChoke( BTChoke choke ) {
choke.destroy();
if (!choked_by_other_peer)
{
choked_by_other_peer =true;
cancelRequests();
final long unchoked =SystemTime.getCurrentTime() -unchokedTime;
if (unchoked >0 &&!isSnubbed())
unchokedTimeTotal +=unchoked;
}
}
protected void decodeUnchoke( BTUnchoke unchoke ) {
unchoke.destroy();
if (choked_by_other_peer)
{
choked_by_other_peer = false;
if (!isSnubbed())
unchokedTime =SystemTime.getCurrentTime();
}
}
protected void decodeInterested( BTInterested interested ) {
interested.destroy();
// Don't allow known seeds to be interested in us
other_peer_interested_in_me =!isSeed();
}
protected void decodeUninterested( BTUninterested uninterested ) {
uninterested.destroy();
other_peer_interested_in_me = false;
//force send any pending haves in case one of them would make the other peer interested again
if( outgoing_have_message_aggregator != null ) {
outgoing_have_message_aggregator.forceSendOfPending();
}
}
protected void decodeHave(BTHave have) {
final int pieceNumber =have.getPieceNumber();
have.destroy();
if ((pieceNumber >=nbPieces) ||(pieceNumber <0)) {
closeConnectionInternally("invalid pieceNumber: " +pieceNumber);
return;
}
if (closing)
return;
if (peerHavePieces ==null)
peerHavePieces =new BitFlags(nbPieces);
if (!peerHavePieces.flags[pieceNumber])
{
if (!interested_in_other_peer &&diskManager.isInteresting(pieceNumber))
{
connection.getOutgoingMessageQueue().addMessage(new BTInterested(), false);
interested_in_other_peer =true;
}
peerHavePieces.set(pieceNumber);
final int pieceLength =manager.getPieceLength(pieceNumber);
manager.havePiece(pieceNumber, pieceLength, this);
checkSeed(); // maybe a seed using lazy bitfield, or suddenly became a seed;
other_peer_interested_in_me &=!isSeed(); // never consider seeds interested
peer_stats.hasNewPiece(pieceLength);
}
}
protected void decodeRequest( BTRequest request ) {
final int number = request.getPieceNumber();
final int offset = request.getPieceOffset();
final int length = request.getLength();
request.destroy();
if( !manager.validateReadRequest( number, offset, length ) ) {
closeConnectionInternally( "request for piece #" + number + ":" + offset + "->" + (offset + length -1) + " is an invalid request" );
return;
}
if( !choking_other_peer ) {
outgoing_piece_message_handler.addPieceRequest( number, offset, length );
}
else {
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LOGID, "decodeRequest(): peer request for piece #"
+ number + ":" + offset + "->" + (offset + length -1)
+ " ignored as peer is currently choked."));
}
}
protected void decodePiece( BTPiece piece ) {
final int pieceNumber = piece.getPieceNumber();
final int offset = piece.getPieceOffset();
final DirectByteBuffer payload = piece.getPieceData();
final int length = payload.remaining( DirectByteBuffer.SS_PEER );
/*
if ( AEDiagnostics.CHECK_DUMMY_FILE_DATA ){
int pos = payload.position( DirectByteBuffer.SS_PEER );
long off = ((long)number) * getControl().getPieceLength(0) + offset;
for (int i=0;i<length;i++){
byte v = payload.get( DirectByteBuffer.SS_PEER );
if ((byte)off != v ){
System.out.println( "piece: read is bad at " + off + ": expected = " + (byte)off + ", actual = " + v );
break;
}
off++;
}
payload.position( DirectByteBuffer.SS_PEER, pos );
}
*/
final Object error_msg =
new Object()
{
public final String
toString()
{
return( "decodePiece(): Peer has sent piece #" + pieceNumber + ":" + offset + "->" + (offset + length -1) + ", " );
}
};
if( !manager.validatePieceReply( pieceNumber, offset, payload ) ) {
peer_stats.bytesDiscarded( length );
manager.discarded( this, length );
requests_discarded++;
printRequestStats();
piece.destroy();
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LOGID, LogEvent.LT_ERROR,
error_msg
+"but piece block discarded as invalid."));
return;
}
final DiskManagerReadRequest request = manager.createDiskManagerRequest( pieceNumber, offset, length );
boolean piece_error = true;
if( hasBeenRequested( request ) ) { //from active request
removeRequest( request );
final long now =SystemTime.getCurrentTime();
reSetRequestsTime(now);
if( manager.isWritten( pieceNumber, offset ) ) { //oops, looks like this block has already been written
peer_stats.bytesDiscarded( length );
manager.discarded( this, length );
if( manager.isInEndGameMode() ) { //we're probably in end-game mode then
if (last_good_data_time !=-1 &&now -last_good_data_time <=60 *1000)
setSnubbed(false);
last_good_data_time =now;
requests_discarded_endgame++;
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_INFORMATION,
error_msg
+"but piece block ignored as already written in end-game mode."));
}
else {
// if they're not snubbed, then most likely this peer got a re-request after some other peer
// snubbed themselves, and the slow peer finially finished the piece, but before this peer did
// so give credit to this peer anyway for having delivered a block at this time
if (!isSnubbed())
last_good_data_time =now;
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_WARNING,
error_msg
+"but piece block discarded as already written."));
requests_discarded++;
}
printRequestStats();
}
else { //successfully received block!
manager.writeBlock( pieceNumber, offset, payload, this, false);
if (last_good_data_time !=-1 &&now -last_good_data_time <=60 *1000)
setSnubbed(false);
last_good_data_time =now;
requests_completed++;
piece_error = false; //dont destroy message, as we've passed the payload on to the disk manager for writing
}
}
else { //initial request may have already expired, but check if we can use the data anyway
if( !manager.isWritten( pieceNumber, offset ) ) {
final boolean ever_requested;
try{ recent_outgoing_requests_mon.enter();
ever_requested = recent_outgoing_requests.containsKey( request );
}
finally{ recent_outgoing_requests_mon.exit(); }
if( ever_requested ) { //security-measure: we dont want to be accepting any ol' random block
manager.writeBlock( pieceNumber, offset, payload, this, true);
final long now =SystemTime.getCurrentTime();
if (last_good_data_time !=-1 &&now -last_good_data_time <=60 *1000)
setSnubbed(false);
reSetRequestsTime(now);
last_good_data_time =now;
requests_recovered++;
printRequestStats();
piece_error = false; //dont destroy message, as we've passed the payload on to the disk manager for writing
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_INFORMATION,
error_msg
+"expired piece block data recovered as useful."));
}
else {
System.out.println( "[" +client+ "]" +error_msg + "but expired piece block discarded as never requested." );
peer_stats.bytesDiscarded( length );
manager.discarded( this, length );
requests_discarded++;
printRequestStats();
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_ERROR,
error_msg
+"but expired piece block discarded as never requested."));
}
}
else {
peer_stats.bytesDiscarded( length );
manager.discarded( this, length );
requests_discarded++;
printRequestStats();
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_WARNING,
error_msg
+"but expired piece block discarded as already written."));
}
}
if( piece_error )
{
piece.destroy();
}
}
protected void decodeCancel( BTCancel cancel ) {
int number = cancel.getPieceNumber();
int offset = cancel.getPieceOffset();
int length = cancel.getLength();
cancel.destroy();
outgoing_piece_message_handler.removePieceRequest( number, offset, length );
}
private void registerForMessageHandling() {
//INCOMING MESSAGES
connection.getIncomingMessageQueue().registerQueueListener( new IncomingMessageQueue.MessageQueueListener() {
public final boolean messageReceived( Message message ) {
if (Logger.isEnabled())
Logger.log(new LogEvent(PEPeerTransportProtocol.this, LogIDs.NET,
"Received [" + message.getDescription() + "] message"));
final long now =SystemTime.getCurrentTime();
last_message_received_time =now;
if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
last_data_message_received_time =now;
}
if( message.getID().equals( BTMessage.ID_BT_PIECE ) ) {
decodePiece( (BTPiece)message );
return true;
}
if( closing ) {
message.destroy();
return true;
}
if( message.getID().equals( BTMessage.ID_BT_KEEP_ALIVE ) ) {
message.destroy();
//make sure they're not spamming us
if( !message_limiter.countIncomingMessage( message.getID(), 6, 60*1000 ) ) { //allow max 6 keep-alives per 60sec
System.out.println( "Incoming keep-alive message flood detected, dropping spamming peer connection." +PEPeerTransportProtocol.this );
closeConnectionInternally( "Incoming keep-alive message flood detected, dropping spamming peer connection." );
}
return true;
}
if( message.getID().equals( BTMessage.ID_BT_HANDSHAKE ) ) {
decodeBTHandshake( (BTHandshake)message );
return true;
}
if( message.getID().equals( AZMessage.ID_AZ_HANDSHAKE ) ) {
decodeAZHandshake( (AZHandshake)message );
return true;
}
if( message.getID().equals( BTMessage.ID_BT_BITFIELD ) ) {
decodeBitfield( (BTBitfield)message );
return true;
}
if( message.getID().equals( BTMessage.ID_BT_CHOKE ) ) {
decodeChoke( (BTChoke)message );
if( choking_other_peer ) {
connection.enableEnhancedMessageProcessing( false ); //downgrade back to normal handler
}
return true;
}
if( message.getID().equals( BTMessage.ID_BT_UNCHOKE ) ) {
decodeUnchoke( (BTUnchoke)message );
connection.enableEnhancedMessageProcessing( true ); //make sure we use a fast handler for the resulting download
return true;
}
if( message.getID().equals( BTMessage.ID_BT_INTERESTED ) ) {
decodeInterested( (BTInterested)message );
return true;
}
if( message.getID().equals( BTMessage.ID_BT_UNINTERESTED ) ) {
decodeUninterested( (BTUninterested)message );
return true;
}
if( message.getID().equals( BTMessage.ID_BT_HAVE ) ) {
decodeHave( (BTHave)message );
return true;
}
if( message.getID().equals( BTMessage.ID_BT_REQUEST ) ) {
decodeRequest( (BTRequest)message );
return true;
}
if( message.getID().equals( BTMessage.ID_BT_CANCEL ) ) {
decodeCancel( (BTCancel)message );
return true;
}
if( message.getID().equals( AZMessage.ID_AZ_PEER_EXCHANGE ) ) {
de
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -