📄 udpconnectionset.java
字号:
/*
* Created on 22 Jun 2006
* Created by Paul Gardner
* Copyright (C) 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.networkmanager.impl.udp;
import java.util.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import javax.crypto.spec.SecretKeySpec;
import org.bouncycastle.crypto.CipherParameters;
import org.bouncycastle.crypto.engines.RC4Engine;
import org.bouncycastle.crypto.params.KeyParameter;
import org.gudy.azureus2.core3.logging.LogEvent;
import org.gudy.azureus2.core3.logging.LogIDs;
import org.gudy.azureus2.core3.logging.Logger;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.SHA1Hasher;
import org.gudy.azureus2.core3.util.SystemTime;
import com.aelitis.net.udp.uc.PRUDPPacketReply;
public class
UDPConnectionSet
{
private static final LogIDs LOGID = LogIDs.NET;
public static final int PROTOCOL_DATA_HEADER_SIZE = 30;
private static final boolean DEBUG_SEQUENCES = false;
static{
if ( DEBUG_SEQUENCES ){
System.out.println( "**** UDPConnectionSet: debug sequences is on ****" );
}
}
private static final byte[] KEYA_IV = "UDPDriverKeyA".getBytes();
private static final byte[] KEYB_IV = "UDPDriverKeyB".getBytes();
private static final byte[] KEYC_IV = "UDPDriverKeyC".getBytes();
private static final byte[] KEYD_IV = "UDPDriverKeyD".getBytes();
private static final int MIN_MSS = 256;
private static final int MAX_HEADER = 128;
public static final int MIN_WRITE_PAYLOAD = MIN_MSS - MAX_HEADER;
private UDPConnectionManager manager;
private UDPSelector selector;
private int local_port;
private InetSocketAddress remote_address;
private boolean outgoing;
private String connection_key;
private Random random;
private UDPConnection lead_connection;
private RC4Engine header_cipher_out;
private RC4Engine header_cipher_in;
private SequenceGenerator in_seq_generator;
private SequenceGenerator out_seq_generator;
private volatile boolean crypto_done;
private volatile boolean failed;
private Map connections = new HashMap();
private LinkedList connection_writers = new LinkedList();
private long total_tick_count;
private static final int STATS_LOG_TIMER = 60*1000;
private static final int STATS_LOG_TICKS = Math.max( 1, STATS_LOG_TIMER / UDPConnectionManager.TIMER_TICK_MILLIS );
private int stats_log_ticks = STATS_LOG_TICKS;
private static final int IDLE_TIMER = 10*1000;
private static final int IDLE_TICKS = Math.max( 1, IDLE_TIMER / UDPConnectionManager.TIMER_TICK_MILLIS );
private int idle_ticks = 0;
private static final int TIMER_BASE_DEFAULT = 300;
private static final int TIMER_BASE_MIN = 100;
private static final int TIMER_BASE_MAX = 15*1000;
private int current_timer_base = TIMER_BASE_DEFAULT;
private int old_timer_base = current_timer_base;
private boolean timer_is_adjusting;
// these stats_ values are reset periodically
private int stats_packets_unique_sent; // unique packets sent
private int stats_packets_resent_via_timer; // total resent due to resend timer expiry
private int stats_packets_unique_received; // unique packets received (not resends)
private int stats_packets_duplicates; // duplicates received
private static final int STATS_RESET_TIMER = 30*1000;
private long stats_reset_time = SystemTime.getCurrentTime();
// cumulative stats
private int total_packets_sent = 0;
private int total_data_sent = 0;
private int total_data_resent = 0;
private int total_protocol_sent = 0;
private int total_protocol_resent = 0;
private int total_packets_unique_sent = 0;
private int total_packets_received = 0;
private int total_packets_unique_received = 0;
private int total_packets_duplicates = 0;
private int total_packets_out_of_order = 0;
private int total_packets_resent_via_timer = 0;
private int total_packets_resent_via_ack = 0;
// transmit
private int retransmit_ticks = 0;
private UDPPacket current_retransmit_target;
private static final int RETRANSMIT_COUNT_LIMIT = 5;
private static final int MIN_RETRANSMIT_TIMER = 100;
private static final int MIN_RETRANSMIT_TICKS = Math.max( 1, MIN_RETRANSMIT_TIMER / UDPConnectionManager.TIMER_TICK_MILLIS );
private static final int MAX_RETRANSMIT_TIMER = 20*1000;
private static final int MAX_RETRANSMIT_TICKS = Math.max( 1, MAX_RETRANSMIT_TIMER / UDPConnectionManager.TIMER_TICK_MILLIS );
private static final int MAX_TRANSMIT_UNACK_DATA_PACKETS = 10;
private static final int MAX_TRANSMIT_UNACK_PACKETS = MAX_TRANSMIT_UNACK_DATA_PACKETS + 4; // + protocol packets
private List transmit_unack_packets = new ArrayList();
private static final int MAX_CONTIGUOUS_RETRANS_FOR_ACK = 3;
private static final int MIN_KEEPALIVE_TIMER = 10*1000;
private static final int MIN_KEEPALIVE_TICKS = Math.max( 1, MIN_KEEPALIVE_TIMER / UDPConnectionManager.TIMER_TICK_MILLIS );
private static final int MAX_KEEPALIVE_TIMER = 20*1000;
private static final int MAX_KEEPALIVE_TICKS = Math.max( 1, MAX_KEEPALIVE_TIMER / UDPConnectionManager.TIMER_TICK_MILLIS );
private int keep_alive_ticks;
// receive
private int receive_last_inorder_sequence = -1;
private int receive_last_inorder_alt_sequence = -1;
private int receive_their_last_inorder_sequence = -1;
private static final int RECEIVE_UNACK_IN_SEQUENCE_LIMIT = 3;
private long current_receive_unack_in_sequence_count = 0;
private long sent_receive_unack_in_sequence_count = 0;
private static final int RECEIVE_OUT_OF_ORDER_ACK_LIMIT = 3;
private long current_receive_out_of_order_count = 0;
private long sent_receive_out_of_order_count = 0;
private static final int RECEIVE_DONE_SEQ_MAX = 128;
private LinkedList receive_done_sequences = new LinkedList();
private static final int RECEIVE_OUT_OF_ORDER_PACKETS_MAX = 64;
private List receive_out_of_order_packets = new LinkedList();
private int explicitack_ticks = 0;
private static final int MAX_SEQ_MEMORY = Math.max( RECEIVE_OUT_OF_ORDER_PACKETS_MAX, MAX_TRANSMIT_UNACK_PACKETS );
protected
UDPConnectionSet(
UDPConnectionManager _manager,
String _connection_key,
UDPSelector _selector,
int _local_port,
InetSocketAddress _remote_address )
{
manager = _manager;
connection_key = _connection_key;
selector = _selector;
local_port = _local_port;
remote_address = _remote_address;
}
protected UDPSelector
getSelector()
{
return( selector );
}
protected InetSocketAddress
getRemoteAddress()
{
return( remote_address );
}
protected String
getKey()
{
return( connection_key );
}
protected void
add(
UDPConnection connection )
throws IOException
{
UDPConnection old_connection = null;
synchronized( connections ){
if ( failed ){
throw( new IOException( "Connection set has failed" ));
}
old_connection = (UDPConnection)connections.put( new Integer( connection.getID()), connection );
if ( connections.size() == 1 && lead_connection == null ){
lead_connection = connection;
outgoing = true;
}
}
if ( old_connection != null ){
Debug.out( "Duplicate connection" );
old_connection.close( "Duplication connection" );
}
}
protected boolean
remove(
UDPConnection connection )
{
synchronized( connections ){
connections.remove( new Integer( connection.getID()));
return( connections.size() == 0 );
}
}
protected void
poll()
{
synchronized( connections ){
Iterator it = connections.values().iterator();
while( it.hasNext()){
((UDPConnection)it.next()).poll();
}
}
}
protected void
setSecret(
UDPConnection connection,
byte[] session_secret )
{
try{
if ( connection == lead_connection ){
if ( manager.trace() ){
trace( "crypto done" );
}
SHA1Hasher hasher = new SHA1Hasher();
hasher.update( KEYA_IV );
hasher.update( session_secret );
byte[] a_key = hasher.getDigest();
hasher = new SHA1Hasher();
hasher.update( KEYB_IV );
hasher.update( session_secret );
byte[] b_key = hasher.getDigest();
hasher = new SHA1Hasher();
hasher.update( KEYC_IV );
hasher.update( session_secret );
byte[] c_key = hasher.getDigest();
hasher = new SHA1Hasher();
hasher.update( KEYD_IV );
hasher.update( session_secret );
byte[] d_key = hasher.getDigest();
// for RC4 enc/dec is irrelevant
RC4Engine rc4_engine_a = getCipher( a_key );
RC4Engine rc4_engine_b = getCipher( b_key );
RC4Engine rc4_engine_c = getCipher( c_key );
RC4Engine rc4_engine_d = getCipher( d_key );
if ( lead_connection.isIncoming()){
header_cipher_out = rc4_engine_a;
header_cipher_in = rc4_engine_b;
out_seq_generator = new SequenceGenerator( new Random( bytesToLong( d_key )), rc4_engine_c, false );
in_seq_generator = new SequenceGenerator( new Random( bytesToLong( c_key )), rc4_engine_d, true );
random = new Random( bytesToLong( d_key, 8 ));
}else{
header_cipher_out = rc4_engine_b;
header_cipher_in = rc4_engine_a;
in_seq_generator = new SequenceGenerator( new Random( bytesToLong( d_key )), rc4_engine_c, true );
out_seq_generator = new SequenceGenerator( new Random( bytesToLong( c_key )), rc4_engine_d, false );
random = new Random( bytesToLong( c_key, 8 ));
}
// as the first packet each way is crypto we skip a sequence number from each generator
// to represent this and initialise the last-in-order seq appropriately so a sensible value is
// spliced into the next packet
out_seq_generator.getNextSequenceNumber();
int[] initial_in_seqs = in_seq_generator.getNextSequenceNumber();
receive_last_inorder_alt_sequence = initial_in_seqs[3];
crypto_done = true;
}else if ( !crypto_done ){
Debug.out( "Secondary setSecret but crypto not done" );
}
}catch( Throwable e ){
Debug.printStackTrace(e);
connection.close( "Crypto problems: "+ Debug.getNestedExceptionMessage(e));
}
}
protected RC4Engine
getCipher(
byte[] key )
{
SecretKeySpec secret_key_spec = new SecretKeySpec( key, "RC4" );
RC4Engine rc4_engine = new RC4Engine();
CipherParameters params_a = new KeyParameter( secret_key_spec.getEncoded());
// for RC4 enc/dec is irrelevant
rc4_engine.init( true, params_a );
// skip first 1024 bytes of stream to protected against a Fluhrer, Mantin and Shamir attack
byte[] temp = new byte[1024];
rc4_engine.processBytes( temp, 0, temp.length, temp, 0 );
return( rc4_engine );
}
protected void
sendTimerBase()
{
// only the outgoing side of a connection can initiate changes in timer base
if ( !outgoing ){
return;
}
// the thing that kills a connection is too many retransmits so we need to keep these minimised during timer changes
// when we increase our base timer we can immediately increase our retransmit timer but need to wait until they have
// modified their timer before increasing the ack. Otherwise they won't be receiving acks as fast as they expect and therefore
// trigger retransmits
// when we decrease our base timer it is the opposite way around - we can immediately decrease acks but delay retrans
synchronized( this ){
if ( timer_is_adjusting ){
return;
}
// only consider the stats if we've sent at least a few this interval
if ( stats_packets_unique_sent > 2 ){
int new_timer_base = current_timer_base;
if ( stats_packets_resent_via_timer > 0 ){
float resend_ratio = (float)stats_packets_resent_via_timer / stats_packets_unique_sent;
// System.out.println( "resend ratio: " + resend_ratio );
if ( resend_ratio >= 0.25 ){
new_timer_base = (int)( current_timer_base * ( resend_ratio + 1 ));
// round to 100th sec as we send 100ths over the wire and expect to get it back
new_timer_base = (new_timer_base/10)*10;
new_timer_base = Math.min( TIMER_BASE_MAX, new_timer_base );
if ( new_timer_base != current_timer_base ){
if ( manager.trace()){
trace( "Increasing timer base from " + current_timer_base + " to " + new_timer_base + " due to resends (ratio=" + resend_ratio + ")" );
}
}
}
}
if ( new_timer_base == current_timer_base && stats_packets_unique_received > 2 ){
float duplicate_ratio = (float)stats_packets_duplicates / stats_packets_unique_received;
// we use duplicate packets sometimes to force sequence numbers through, so
// reduce our sensitivity this them
duplicate_ratio = duplicate_ratio/2;
// System.out.println( "duplicate ratio: " + duplicate_ratio );
if ( duplicate_ratio >= 0.25 ){
new_timer_base = (int)( current_timer_base * ( duplicate_ratio + 1 ));
new_timer_base = (new_timer_base/10)*10;
new_timer_base = Math.min( TIMER_BASE_MAX, new_timer_base );
if ( new_timer_base != current_timer_base ){
if ( manager.trace() ){
trace( "Increasing timer base from " + current_timer_base + " to " + new_timer_base + " due to duplicates (ratio=" + duplicate_ratio + ")" );
}
}
}
}
if ( new_timer_base == current_timer_base && stats_packets_unique_received > 2 ){
// conservative approach - reduce by 10% if we've had no errors
if ( stats_packets_resent_via_timer == 0 && stats_packets_duplicates == 0 ){
new_timer_base = current_timer_base - (current_timer_base/10);
new_timer_base = (new_timer_base/10)*10;
new_timer_base = Math.max( new_timer_base, TIMER_BASE_MIN );
if ( new_timer_base != current_timer_base ){
if ( manager.trace()){
trace( "Decreasing timer base from " + current_timer_base + " to " + new_timer_base );
}
}
}
}
boolean reset_stats = false;
long now = SystemTime.getCurrentTime();
if ( new_timer_base == current_timer_base ){
if ( now < stats_reset_time || now - stats_reset_time > STATS_RESET_TIMER ){
reset_stats = true;
}
}else{
timer_is_adjusting = true;
old_timer_base = current_timer_base;
current_timer_base = new_timer_base;
reset_stats = true;
}
if ( reset_stats ){
resetTimerStats();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -