📄 pepeertransportprotocol.java
字号:
requests_discarded++;
printRequestStats();
piece.destroy();
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LOGID, LogEvent.LT_ERROR, "Protocol:In: "
+ error_msg + "but piece block discarded as invalid."));
return;
}
final PEPiece pePiece =manager.getPiece(pieceNumber);
if (pePiece !=null)
pePiece.setDownloaded(offset);
DiskManagerReadRequest request = manager.createDiskManagerRequest( pieceNumber, offset, length );
boolean piece_error = true;
if( hasBeenRequested( request ) ) { //from active request
removeRequest( request );
reSetRequestsTime();
if( manager.isWritten( pieceNumber, offset ) ) { //oops, looks like this block has already been written
peer_stats.bytesDiscarded( length );
manager.discarded( length );
if( manager.isInEndGameMode() ) { //we're probably in end-game mode then
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_INFORMATION,
"Protocol:In: " + error_msg + "but piece block ignored as "
+ "already written in end-game mode."));
requests_discarded_endgame++;
}
else {
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_WARNING,
"Protocol:In: " + error_msg + "but piece block discarded as "
+ "already written."));
requests_discarded++;
}
printRequestStats();
}
else { //successfully received block!
manager.writeBlock( pieceNumber, offset, payload, this, false);
final long now =SystemTime.getCurrentTime();
if (last_good_data_time !=-1 &&now -last_good_data_time <60 *1000)
setSnubbed(false);
requests_completed++;
piece_error = false; //dont destroy message, as we've passed the payload on to the disk manager for writing
last_good_data_time =now;
}
}
else { //initial request may have already expired, but check if we can use the data anyway
if( !manager.isWritten( pieceNumber, offset ) ) {
boolean ever_requested;
try{ recent_outgoing_requests_mon.enter();
ever_requested = recent_outgoing_requests.containsKey( request );
}
finally{ recent_outgoing_requests_mon.exit(); }
if( ever_requested ) { //security-measure: we dont want to be accepting any ol' random block
manager.writeBlock( pieceNumber, offset, payload, this, true);
final long now =SystemTime.getCurrentTime();
if (last_good_data_time !=-1 &&now -last_good_data_time <60 *1000)
setSnubbed(false);
reSetRequestsTime();
requests_recovered++;
printRequestStats();
piece_error = false; //dont destroy message, as we've passed the payload on to the disk manager for writing
last_good_data_time =now;
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LogIDs.PIECES, "Protocol:In: " + error_msg
+ "expired piece block data " + "recovered as useful."));
}
else {
System.out.println( "[" +client+ "]" +error_msg + "but expired piece block discarded as never requested." );
peer_stats.bytesDiscarded( length );
manager.discarded( length );
requests_discarded++;
printRequestStats();
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_ERROR,
"Protocol:In: " + error_msg + "but expired piece block "
+ "discarded as never requested."));
}
}
else {
peer_stats.bytesDiscarded( length );
manager.discarded( length );
requests_discarded++;
printRequestStats();
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LogIDs.PIECES, LogEvent.LT_ERROR,
"Protocol:In: " + error_msg
+ "but expired piece block discarded "
+ "as already written."));
}
}
if( piece_error )
{
piece.destroy();
if (!manager.isWritten(pieceNumber, offset) &&pePiece !=null)
pePiece.clearDownloaded(offset);
}
}
protected void decodeCancel( BTCancel cancel ) {
int number = cancel.getPieceNumber();
int offset = cancel.getPieceOffset();
int length = cancel.getLength();
cancel.destroy();
outgoing_piece_message_handler.removePieceRequest( number, offset, length );
}
private void registerForMessageHandling() {
//INCOMING MESSAGES
connection.getIncomingMessageQueue().registerQueueListener( new IncomingMessageQueue.MessageQueueListener() {
public boolean messageReceived( Message message ) {
if (Logger.isEnabled())
Logger.log(new LogEvent(PEPeerTransportProtocol.this, LogIDs.NET,
"Received [" + message.getDescription() + "] message"));
last_message_received_time = SystemTime.getCurrentTime();
if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
last_data_message_received_time = SystemTime.getCurrentTime();
}
if( message.getID().equals( BTMessage.ID_BT_PIECE ) ) {
decodePiece( (BTPiece)message );
return true;
}
if( closing ) {
message.destroy();
return true;
}
if( message.getID().equals( BTMessage.ID_BT_KEEP_ALIVE ) ) {
message.destroy();
//make sure they're not spamming us
if( !message_limiter.countIncomingMessage( message.getID(), 6, 60*1000 ) ) { //allow max 6 keep-alives per 60sec
System.out.println( "Incoming keep-alive message flood detected, dropping spamming peer connection." +PEPeerTransportProtocol.this );
closeConnectionInternally( "Incoming keep-alive message flood detected, dropping spamming peer connection." );
}
return true;
}
if( message.getID().equals( BTMessage.ID_BT_HANDSHAKE ) ) {
decodeBTHandshake( (BTHandshake)message );
return true;
}
if( message.getID().equals( AZMessage.ID_AZ_HANDSHAKE ) ) {
decodeAZHandshake( (AZHandshake)message );
return true;
}
if( message.getID().equals( BTMessage.ID_BT_BITFIELD ) ) {
decodeBitfield( (BTBitfield)message );
return true;
}
if( message.getID().equals( BTMessage.ID_BT_CHOKE ) ) {
decodeChoke( (BTChoke)message );
if( choking_other_peer ) {
connection.enableEnhancedMessageProcessing( false ); //downgrade back to normal handler
}
return true;
}
if( message.getID().equals( BTMessage.ID_BT_UNCHOKE ) ) {
decodeUnchoke( (BTUnchoke)message );
connection.enableEnhancedMessageProcessing( true ); //make sure we use a fast handler for the resulting download
return true;
}
if( message.getID().equals( BTMessage.ID_BT_INTERESTED ) ) {
decodeInterested( (BTInterested)message );
return true;
}
if( message.getID().equals( BTMessage.ID_BT_UNINTERESTED ) ) {
decodeUninterested( (BTUninterested)message );
return true;
}
if( message.getID().equals( BTMessage.ID_BT_HAVE ) ) {
decodeHave( (BTHave)message );
return true;
}
if( message.getID().equals( BTMessage.ID_BT_REQUEST ) ) {
decodeRequest( (BTRequest)message );
return true;
}
if( message.getID().equals( BTMessage.ID_BT_CANCEL ) ) {
decodeCancel( (BTCancel)message );
return true;
}
if( message.getID().equals( AZMessage.ID_AZ_PEER_EXCHANGE ) ) {
decodeAZPeerExchange( (AZPeerExchange)message );
return true;
}
return false;
}
public void protocolBytesReceived( int byte_count ) {
//update stats
peer_stats.protocolBytesReceived( byte_count );
manager.protocolBytesReceived( byte_count );
}
public void dataBytesReceived( int byte_count ) {
//update stats
peer_stats.dataBytesReceived( byte_count );
manager.dataBytesReceived( byte_count );
// pick up very slow peers as they may deliver an entire block so slowly that the
// request times out if we only use block reception to measure when data's received
//last_good_data_time = SystemTime.getCurrentTime();
//last_good_data_time is expresely for data that's fully validated, wanted, and actually written to disk
//last_data_message_received_time is set for all incoming data messages (even worthless data)
}
});
//OUTGOING MESSAGES
connection.getOutgoingMessageQueue().registerQueueListener( new OutgoingMessageQueue.MessageQueueListener() {
public boolean messageAdded( Message message ) { return true; }
public void messageQueued( Message message ) { /* ignore */ }
public void messageRemoved( Message message ) { /*ignore*/ }
public void messageSent( Message message ) {
//update keep-alive info
last_message_sent_time = SystemTime.getCurrentTime();
if( message.getType() == Message.TYPE_DATA_PAYLOAD ) {
last_data_message_sent_time = SystemTime.getCurrentTime();
}
if( message.getID().equals( BTMessage.ID_BT_UNCHOKE ) ) { // is about to send piece data
connection.enableEnhancedMessageProcessing( true ); //so make sure we use a fast handler
}
else if( message.getID().equals( BTMessage.ID_BT_CHOKE ) ) { // is done sending piece data
if( choked_by_other_peer ) {
connection.enableEnhancedMessageProcessing( false ); //so downgrade back to normal handler
}
}
if (Logger.isEnabled())
Logger.log(new LogEvent(PEPeerTransportProtocol.this, LogIDs.NET,
"Sent [" + message.getDescription() + "] message"));
}
public void protocolBytesSent( int byte_count ) {
//update stats
peer_stats.protocolBytesSent( byte_count );
manager.protocolBytesSent( byte_count );
}
public void dataBytesSent( int byte_count ) {
//update stats
peer_stats.dataBytesSent( byte_count );
manager.dataBytesSent( byte_count );
}
});
//start message processing
connection.startMessageProcessing( manager.getUploadLimitedRateGroup(), manager.getDownloadLimitedRateGroup() );
}
public Connection getConnection() {
return plugin_connection;
}
public Message[] getSupportedMessages() {
return supported_messages;
}
public boolean supportsMessaging() {
return supported_messages != null;
}
public String
getEncryption()
{
return( connection.getTCPTransport().getEncryption());
}
public void
addListener(
PEPeerListener listener )
{
try{
peer_listeners_mon.enter();
if( peer_listeners_cow == null ){
peer_listeners_cow = new ArrayList();
}
List new_listeners = new ArrayList( peer_listeners_cow );
new_listeners.add( listener );
peer_listeners_cow = new_listeners;
}finally{
peer_listeners_mon.exit();
}
}
public void
removeListener(
PEPeerListener listener )
{
try{
peer_listeners_mon.enter();
if ( peer_listeners_cow != null ){
List new_listeners = new ArrayList( peer_listeners_cow );
new_listeners.remove( listener );
if ( new_listeners.isEmpty()){
new_listeners = null;
}
peer_listeners_cow = new_listeners;
}
}finally{
peer_listeners_mon.exit();
}
}
private void changePeerState( int new_state ) {
current_peer_state = new_state;
if( current_peer_state == PEPeer.TRANSFERING ) { //YUCK!
doPostHandshakeProcessing();
}
List peer_listeners_ref = peer_listeners_cow;
if ( peer_listeners_ref != null ){
for( int i=0; i < peer_listeners_ref.size(); i++ ) {
PEPeerListener l = (PEPeerListener)peer_listeners_ref.get( i );
l.stateChanged(this, current_peer_state);
}
}
}
private void doPostHandshakeProcessing() {
//peer exchange registration
if( manager.isPeerExchangeEnabled()) {
//try and register all connections for their peer exchange info
peer_exchange_item = manager.createPeerExchangeConnection( this );
if( peer_exchange_item != null ) {
//check for peer exchange support
if( peerSupportsMessageType( AZMessage.ID_AZ_PEER_EXCHANGE ) ) {
peer_exchange_supported = true;
}
else { //no need to maintain internal states as we wont be sending/receiving peer exchange messages
peer_exchange_item.disableStateMaintenance();
}
}
}
}
private boolean peerSupportsMessageType( String message_id ) {
if( supported_messages != null ) {
for( int i=0; i < supported_messages.length;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -