📄 udpconnectionset.java
字号:
}
}
protected void
resetTimerStats()
{
stats_reset_time = SystemTime.getCurrentTime();
stats_packets_unique_sent = 0;
stats_packets_resent_via_timer = 0;
stats_packets_duplicates = 0;
stats_packets_unique_received = 0;
}
protected void
receiveTimerBase(
int theirs )
{
synchronized( this ){
if ( theirs != current_timer_base){
if ( manager.trace() ){
trace( "Received timer base: current=" + current_timer_base + ",theirs=" + theirs + "(adj=" + timer_is_adjusting + ")" );
}
}
if ( outgoing ){
if ( theirs == current_timer_base ){
if ( timer_is_adjusting ){
timer_is_adjusting = false;
resetTimerStats();
}
}
}else{
// simply use the new value
current_timer_base = theirs;
}
}
}
protected void
timerTick()
throws IOException
{
boolean retrans_expired = false;
boolean ack_expired = false;
boolean keep_alive_expired = false;
synchronized( this ){
if ( connections.size() == 0 ){
idle_ticks++;
}else{
idle_ticks = 0;
}
total_tick_count++;
if ( retransmit_ticks > 0 ){
retransmit_ticks--;
if ( retransmit_ticks == 0 ){
retrans_expired = true;
}
}
if ( explicitack_ticks > 0 ){
explicitack_ticks--;
if ( explicitack_ticks == 0 ){
ack_expired = true;
}
}
if ( keep_alive_ticks > 0 ){
keep_alive_ticks--;
if ( keep_alive_ticks == 0 ){
keep_alive_expired = true;
}
}
stats_log_ticks--;
if ( stats_log_ticks == 0 ){
logStats();
stats_log_ticks = STATS_LOG_TICKS;
}
}
if ( retrans_expired ){
retransmitExpired();
}
if ( ack_expired ){
sendAckCommand( true );
}
if ( keep_alive_expired ){
sendStatsRequest();
}
}
protected int
getRetransmitTicks()
{
int timer_to_use;
synchronized( this ){
if ( timer_is_adjusting ){
if ( current_timer_base > old_timer_base ){
timer_to_use = current_timer_base;
}else{
timer_to_use = old_timer_base;
}
}else{
timer_to_use = current_timer_base;
}
}
// 5/3 of base
int timer = ( timer_to_use * 5 ) / 3;
return( Math.max( 1, timer / UDPConnectionManager.TIMER_TICK_MILLIS ));
}
protected int
getExplicitAckTicks()
{
int timer_to_use;
synchronized( this ){
if ( timer_is_adjusting ){
if ( current_timer_base > old_timer_base ){
timer_to_use = old_timer_base;
}else{
timer_to_use = current_timer_base;
}
}else{
timer_to_use = current_timer_base;
}
}
return( Math.max( 1, timer_to_use / UDPConnectionManager.TIMER_TICK_MILLIS ));
}
protected void
startKeepAliveTimer()
{
keep_alive_ticks = MIN_KEEPALIVE_TICKS + random.nextInt( MAX_KEEPALIVE_TICKS - MIN_KEEPALIVE_TICKS );
}
protected void
stopKeepAliveTimer()
{
keep_alive_ticks = 0;
}
protected boolean
idleLimitExceeded()
{
if ( idle_ticks > IDLE_TICKS + (int)(Math.random()*2000)){
synchronized( connections ){
if ( connections.size() == 0 ){
failed = true;
return( true );
}
}
}
return( false );
}
protected UDPPacket
getRetransmitPacket()
{
Iterator it = transmit_unack_packets.iterator();
while( it.hasNext()){
UDPPacket p = (UDPPacket)it.next();
if ( !p.hasBeenReceived()){
boolean auto_retrans = p.isAutoRetransmit();
// non-auto-retrans only applies to packets if they are the last packet sent
if ( auto_retrans || it.hasNext()){
return( p );
}
}
}
return( null );
}
protected int
getRetransmitTicks(
int resend_count )
{
int ticks = getRetransmitTicks();
int res;
if ( resend_count == 0 ){
res = ticks;
}else{
res = ticks + (( MAX_RETRANSMIT_TICKS - ticks ) * resend_count ) / ( RETRANSMIT_COUNT_LIMIT-1 );
}
// System.out.println( "retry: " + res );
return( res );
}
protected void
retransmitExpired()
throws IOException
{
UDPPacket packet_to_send = null;
synchronized( this ){
packet_to_send = getRetransmitPacket();
if ( packet_to_send != null ){
stats_packets_resent_via_timer++;
total_packets_resent_via_timer++;
packet_to_send.resent();
}
}
if ( packet_to_send != null ){
if ( manager.trace() ){
trace( "Retransmit: " + packet_to_send.getString());
}
send( packet_to_send );
}
}
protected boolean
remoteLastInSequence(
int alt_sequence )
{
// if we find this packet then we can also discard any prior to it as they are implicitly
// ack too
synchronized( this ){
for (int i=0;i<transmit_unack_packets.size();i++){
UDPPacket packet = (UDPPacket)transmit_unack_packets.get(i);
if ( packet.getAlternativeSequence() == alt_sequence ){
receive_their_last_inorder_sequence = packet.getSequence();
for (int j=0;j<=i;j++){
transmit_unack_packets.remove(0);
}
return( true );
}
}
}
return( false );
}
protected synchronized void
dumpState()
{
if ( manager.trace() ){
String str = "State:";
String unack = "";
for (int i=0;i<transmit_unack_packets.size();i++){
UDPPacket packet = (UDPPacket)transmit_unack_packets.get(i);
unack += (i==0?"":",") + packet.getString();
}
str += "unack=" + unack + ",last_in_order=" + receive_last_inorder_sequence +
",current_in_seq=" + current_receive_unack_in_sequence_count +
",sent_in_seq=" + sent_receive_unack_in_sequence_count +
",current_oo=" + current_receive_out_of_order_count +
",sent_oo=" + sent_receive_out_of_order_count;
/*
String done = "";
for (int i=0;i<receive_done_sequences.size();i++){
done += (i==0?"":",") + receive_done_sequences.get(i);
}
str += ",done=" + done;
*/
String oo = "";
for (int i=0;i<receive_out_of_order_packets.size();i++){
Object[] entry = (Object[])receive_out_of_order_packets.get(i);
oo += (i==0?"":",") + entry[0] + "/" + entry[1] + "/" + (entry[2]==null?"null":String.valueOf(((ByteBuffer)entry[2]).remaining()));
}
str += ",oo=" + oo;
str += ",sent_data=" + total_data_sent +"/" + total_data_resent + ",sent_prot=" + total_protocol_sent + "/" + total_protocol_resent;
trace( str );
}
}
protected void
send(
UDPPacket packet )
throws IOException
{
if ( failed ){
throw( new IOException( "Connection set has failed" ));
}
byte[] payload = packet.getBuffer();
if ( manager.trace() ){
trace( packet.getConnection(), "Write: " + packet.getString());
}
synchronized( this ){
total_packets_sent++;
int resend_count = packet.getResendCount();
if ( resend_count > RETRANSMIT_COUNT_LIMIT ){
throw( new IOException( "Packet resend limit exceeded" ));
}
// all packets carry an implicit ack, pick up the corresponding count here
long unackin = packet.getUnAckInSequenceCount();
if ( unackin > sent_receive_unack_in_sequence_count ){
sent_receive_unack_in_sequence_count = unackin;
}
// trigger the retransmit timer if any sent packets have the auto-retransmit property
UDPPacket retransmit_target = getRetransmitPacket();
if ( retransmit_target == null ){
// no auto-retransmit packet, cancel timer
retransmit_ticks = 0;
}else if ( retransmit_target != current_retransmit_target ||
retransmit_target == packet){
// auto-retransmit packet has changed or we are currently retransmitting it, reset timer
retransmit_ticks = getRetransmitTicks( resend_count );
}else{
// current retry target timer expired, restart it
if ( retransmit_ticks == 0 ){
retransmit_ticks = getRetransmitTicks( resend_count );
}
}
current_retransmit_target = retransmit_target;
// splice in the latest received in sequence alternative seq if non-crypto packet
if ( packet.getAlternativeSequence() != -1 ){
byte[] alt = intToBytes( receive_last_inorder_alt_sequence );
payload[0] = alt[0];
payload[1] = alt[1];
payload[8] = alt[2];
payload[9] = alt[3];
}
int send_count = packet.sent( total_tick_count );
if ( send_count == 1 ){
if ( packet.getCommand() == UDPPacket.COMMAND_DATA ){
total_data_sent++;
}else{
total_protocol_sent++;
}
}else{
if ( packet.getCommand() == UDPPacket.COMMAND_DATA ){
total_data_resent++;
}else{
total_protocol_resent++;
}
}
}
manager.send( local_port, remote_address, payload );
}
public void
receive(
byte[] initial_data,
int initial_data_length )
throws IOException
{
if ( failed ){
throw( new IOException( "Connection set has failed" ));
}
dumpState();
if ( manager.trace() ){
trace( "Read: total=" + initial_data_length );
}
synchronized( this ){
total_packets_received++;
}
ByteBuffer initial_buffer = ByteBuffer.wrap( initial_data );
initial_buffer.limit( initial_data_length );
if ( !crypto_done ){
// first packet - connection setup and crypto handshake
// derive the sequence number in the normal way so that if a retranmission occurs
// after crypto has been setup then it'll get handled correctly as a dupliate packet
// below
initial_buffer.position( 4 );
Integer pseudo_seq = new Integer( initial_buffer.getInt());
initial_buffer.position( 0 );
if ( !receive_done_sequences.contains( pseudo_seq )){
receive_done_sequences.addFirst( pseudo_seq );
if ( receive_done_sequences.size() > RECEIVE_DONE_SEQ_MAX ){
receive_done_sequences.removeLast();
}
}
if ( outgoing ){
// a reply received by the initiator acknowledges that the initial message sent has
// been received
remoteLastInSequence( -1 );
}
receiveCrypto( initial_buffer );
}else{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -