📄 pepeertransportprotocol.java
字号:
public void doKeepAliveCheck() {
final long now =SystemTime.getCurrentTime();
final long wait_time =now -last_message_sent_time;
if( last_message_sent_time == 0 || wait_time < 0 ) {
last_message_sent_time =now; //don't send if brand new connection
return;
}
if( wait_time > 2*60*1000 ) { //2min keep-alive timer
sendKeepAlive();
last_message_sent_time =now; //not quite true, but we don't want to queue multiple keep-alives before the first is actually sent
}
}
public boolean doTimeoutChecks() {
//Timeouts for states PEPeerTransport.CONNECTION_PENDING and
//PEPeerTransport.CONNECTION_CONNECTING are handled by the ConnectDisconnectManager
//so we don't need to deal with them here.
final long now =SystemTime.getCurrentTime();
//make sure we time out stalled connections
if( connection_state == PEPeerTransport.CONNECTION_FULLY_ESTABLISHED ) {
if (last_message_received_time >now)
last_message_received_time =now;
if (last_data_message_received_time >now)
last_data_message_received_time =now;
if (now -last_message_received_time >5*60*1000
&&now -last_data_message_received_time >5*60*1000) { //5min timeout
closeConnectionInternally( "timed out while waiting for messages" );
return true;
}
}
//ensure we dont get stuck in the handshaking phases
else if( connection_state == PEPeerTransport.CONNECTION_WAITING_FOR_HANDSHAKE ) {
if (connection_established_time >now)
connection_established_time =now;
else if (now - connection_established_time > 3*60*1000 ) { //3min timeout
closeConnectionInternally( "timed out while waiting for handshake" );
return true;
}
}
return false;
}
public void doPerformanceTuningCheck() {
Transport transport = connection.getTransport();
if( transport != null && peer_stats != null && outgoing_piece_message_handler != null ) {
//send speed -based tuning
final long send_rate = peer_stats.getDataSendRate() + peer_stats.getProtocolSendRate();
if( send_rate >= 3125000 ) { // 25 Mbit/s
transport.setTransportMode( Transport.TRANSPORT_MODE_TURBO );
outgoing_piece_message_handler.setRequestReadAhead( 256 );
}
else if( send_rate >= 1250000 ) { // 10 Mbit/s
transport.setTransportMode( Transport.TRANSPORT_MODE_TURBO );
outgoing_piece_message_handler.setRequestReadAhead( 128 );
}
else if( send_rate >= 125000 ) { // 1 Mbit/s
if( transport.getTransportMode() < Transport.TRANSPORT_MODE_FAST ) {
transport.setTransportMode( Transport.TRANSPORT_MODE_FAST );
}
outgoing_piece_message_handler.setRequestReadAhead( 32 );
}
else if( send_rate >= 62500 ) { // 500 Kbit/s
outgoing_piece_message_handler.setRequestReadAhead( 16 );
}
else if( send_rate >= 31250 ) { // 250 Kbit/s
outgoing_piece_message_handler.setRequestReadAhead( 8 );
}
else if( send_rate >= 12500 ) { // 100 Kbit/s
outgoing_piece_message_handler.setRequestReadAhead( 4 );
}
else {
outgoing_piece_message_handler.setRequestReadAhead( 2 );
}
//receive speed -based tuning
final long receive_rate = peer_stats.getDataReceiveRate() + peer_stats.getProtocolReceiveRate();
if( receive_rate >= 1250000 ) { // 10 Mbit/s
transport.setTransportMode( Transport.TRANSPORT_MODE_TURBO );
}
else if( receive_rate >= 125000 ) { // 1 Mbit/s
if( transport.getTransportMode() < Transport.TRANSPORT_MODE_FAST ) {
transport.setTransportMode( Transport.TRANSPORT_MODE_FAST );
}
}
}
}
public int getConnectionState() { return connection_state; }
public long getTimeSinceLastDataMessageReceived() {
if( last_data_message_received_time == -1 ) { //never received
return -1;
}
final long now =SystemTime.getCurrentTime();
if (last_data_message_received_time >now)
last_data_message_received_time =now; //time went backwards
return now -last_data_message_received_time;
}
public long getTimeSinceGoodDataReceived()
{
if (last_good_data_time ==-1)
return -1; // never received
final long now =SystemTime.getCurrentTime();
if (last_good_data_time >now)
last_good_data_time =now; //time went backwards
return now -last_good_data_time;
}
public long getTimeSinceLastDataMessageSent() {
if( last_data_message_sent_time == -1 ) { //never sent
return -1;
}
final long now =SystemTime.getCurrentTime();
if (last_data_message_sent_time >now)
last_data_message_sent_time =now; //time went backwards
return now -last_data_message_sent_time;
}
public long getTimeSinceConnectionEstablished() {
if( connection_established_time == 0 ) { //fudge it while the transport is being connected
return 0;
}
final long now =SystemTime.getCurrentTime();
if (connection_established_time >now)
connection_established_time =now;
return now -connection_established_time;
}
public int
getConsecutiveNoRequestCount()
{
return( consecutive_no_request_count );
}
public void
setConsecutiveNoRequestCount(
int num )
{
consecutive_no_request_count = num;
}
protected void decodeBTHandshake( BTHandshake handshake ) {
PeerIdentityDataID my_peer_data_id = manager.getPeerIdentityDataID();
if( !Arrays.equals( manager.getHash(), handshake.getDataHash() ) ) {
closeConnectionInternally( "handshake has wrong infohash" );
handshake.destroy();
return;
}
peer_id = handshake.getPeerId();
//decode a client identification string from the given peerID
client = PeerClassifier.getClientDescription( peer_id );
//make sure the client type is not banned
if( !PeerClassifier.isClientTypeAllowed( client ) ) {
closeConnectionInternally( client+ " client type not allowed to connect, banned" );
handshake.destroy();
return;
}
//make sure we are not connected to ourselves
if( Arrays.equals( manager.getPeerId(), peer_id ) ) {
manager.peerVerifiedAsSelf( this ); //make sure we dont do it again
closeConnectionInternally( "given peer id matches myself" );
handshake.destroy();
return;
}
//make sure we are not already connected to this peer
boolean sameIdentity = PeerIdentityManager.containsIdentity( my_peer_data_id, peer_id, getPort());
boolean sameIP = false;
//allow loopback connects for co-located proxy-based connections and testing
boolean same_allowed = COConfigurationManager.getBooleanParameter( "Allow Same IP Peers" ) || ip.equals( "127.0.0.1" );
if( !same_allowed ){
if( PeerIdentityManager.containsIPAddress( my_peer_data_id, ip )) {
sameIP = true;
}
}
if( sameIdentity ) {
boolean close = true;
if( connection.isLANLocal() ) { //this new connection is lan-local
final PEPeerTransport existing = manager.getTransportFromIdentity( peer_id );
if( existing != null && !existing.isLANLocal() ) { //so drop the existing connection if it is an external (non lan-local) one
Debug.out( "dropping existing non-lanlocal peer connection [" +existing+ "]" );
manager.removePeer( existing );
close = false;
}
}
if( close ) {
closeConnectionInternally( "peer matches already-connected peer id" );
handshake.destroy();
return;
}
}
if( sameIP ) {
closeConnectionInternally( "peer matches already-connected IP address, duplicate connections not allowed" );
handshake.destroy();
return;
}
//make sure we haven't reached our connection limit
final int maxAllowed = manager.getMaxNewConnectionsAllowed();
if (maxAllowed ==0 &&!manager.doOptimisticDisconnect( isLANLocal()))
{
final String msg = "too many existing peer connections [p" +
PeerIdentityManager.getIdentityCount( my_peer_data_id )
+"/g" +PeerIdentityManager.getTotalIdentityCount()
+", pmx" +PeerUtils.MAX_CONNECTIONS_PER_TORRENT+ "/gmx"
+PeerUtils.MAX_CONNECTIONS_TOTAL+"/dmx" + manager.getMaxConnections()+ "]";
//System.out.println( msg );
closeConnectionInternally( msg );
handshake.destroy();
return;
}
try{
closing_mon.enter();
if( closing ){
final String msg = "connection already closing";
closeConnectionInternally( msg );
handshake.destroy();
return;
}
if ( !PeerIdentityManager.addIdentity( my_peer_data_id, peer_id, getPort(), ip )){
closeConnectionInternally( "peer matches already-connected peer id" );
handshake.destroy();
return;
}
identityAdded = true;
}finally{
closing_mon.exit();
}
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LOGID, "In: has sent their handshake"));
/*
* Waiting until we've received the initiating-end's full handshake, before sending back our own,
* really should be the "proper" behavior. However, classic BT trackers running NAT checking will
* only send the first 48 bytes (up to infohash) of the peer handshake, skipping peerid, which means
* we'll never get their complete handshake, and thus never reply, which causes the NAT check to fail.
* So, we need to send our handshake earlier, after we've verified the infohash.
*
if( incoming ) { //wait until we've received their handshake before sending ours
sendBTHandshake();
}
*/
//extended protocol processing
if( (handshake.getReserved()[0] & 128) == 128 ) { //if first (high) bit is set
if( !manager.isAZMessagingEnabled() ) {
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LOGID,
"Ignoring peer's extended AZ messaging support,"
+ " as disabled for this download."));
}
else if( client.indexOf( "Plus!" ) != -1 ) {
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LOGID, "Handshake mistakingly indicates"
+ " extended AZ messaging support...ignoring."));
}
else {
if (Logger.isEnabled() && client.indexOf("Azureus") == -1) {
Logger.log(new LogEvent(this, LOGID, "Handshake claims extended AZ "
+ "messaging support....enabling AZ mode."));
}
az_messaging_mode = true;
connection.getIncomingMessageQueue().setDecoder( new AZMessageDecoder() );
connection.getOutgoingMessageQueue().setEncoder( new AZMessageEncoder() );
sendAZHandshake();
}
}
handshake.destroy();
/*
for( int i=0; i < reserved.length; i++ ) {
int val = reserved[i] & 0xFF;
if( val != 0 ) {
System.out.println( "Peer "+ip+" ["+client+"] sent reserved byte #"+i+" to " +val);
}
}
*/
if( !az_messaging_mode ) { //otherwise we'll do this after receiving az handshake
connection.getIncomingMessageQueue().resumeQueueProcessing(); //HACK: because BT decoder is auto-paused after initial handshake, so it doesn't accidentally decode the next AZ message
changePeerState( PEPeer.TRANSFERING );
connection_state = PEPeerTransport.CONNECTION_FULLY_ESTABLISHED;
sendBitField();
addAvailability();
}
}
protected void decodeAZHandshake( AZHandshake handshake ) {
client = handshake.getClient()+ " " +handshake.getClientVersion();
if( handshake.getTCPListenPort() > 0 ) { //use the ports given in handshake
tcp_listen_port = handshake.getTCPListenPort();
udp_listen_port = handshake.getUDPListenPort();
udp_non_data_port = handshake.getUDPNonDataListenPort();
final byte type = handshake.getHandshakeType() == AZHandshake.HANDSHAKE_TYPE_CRYPTO ? PeerItemFactory.HANDSHAKE_TYPE_CRYPTO : PeerItemFactory.HANDSHAKE_TYPE_PLAIN;
//remake the id using the peer's remote listen port instead of their random local port
peer_item_identity = PeerItemFactory.createPeerItem( ip, tcp_listen_port, PeerItem.convertSourceID( peer_source ), type, udp_listen_port, crypto_level, 0 );
}
//find mutually available message types
final ArrayList messages = new ArrayList();
for( int i=0; i < handshake.getMessageIDs().length; i++ ) {
Message msg = MessageManager.getSingleton().lookupMessage( handshake.getMessageIDs()[i] );
if( msg != null ) { //mutual support!
messages.add( msg );
}
}
supported_messages = (Message[])messages.toArray( new Message[messages.size()] );
changePeerState( PEPeer.TRANSFERING );
connection_state = PEPeerTransport.CONNECTION_FULLY_ESTABLISHED;
sendBitField();
handshake.destroy();
addAvailability();
}
protected void decodeBitfield( BTBitfield bitfield )
{
final DirectByteBuffer field =bitfield.getBitfield();
final byte[] dataf =new byte[(nbPieces +7) /8];
if( field.remaining( DirectByteBuffer.SS_PEER ) < dataf.length ) {
final String error = toString() + " has sent invalid Bitfield: too short [" +field.remaining( DirectByteBuffer.SS_PEER )+ "<" +dataf.length+ "]";
Debug.out( error );
if (Logger.isEnabled())
Logger.log(new LogEvent(this, LOGID, LogEvent.LT_ERROR, error ));
bitfield.destroy();
return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -