📄 udpconnectionset.java
字号:
// pull out the alternative last-in-order seq
byte[] alt_seq = new byte[4];
alt_seq[0] = initial_data[0];
alt_seq[1] = initial_data[1];
alt_seq[2] = initial_data[8];
alt_seq[3] = initial_data[9];
int alt = bytesToInt( alt_seq, 0 );
boolean write_select = remoteLastInSequence( alt );
boolean lazy_ack_found = false;
try{
initial_buffer.getInt(); // seq1
Integer seq2 = new Integer( initial_buffer.getInt());
initial_buffer.getInt(); // seq3
// first see if we know about this sequence number already
if ( receive_done_sequences.contains( seq2 )){
if ( manager.trace() ){
trace( "Duplicate processed packet: " + seq2 );
}
// if we're gone quiescent and our lazy-ack packet failed to be delivered we end up here with the other end
// resending their last message. We pick up this delivery failure and resend the packet
UDPPacket packet_to_send = null;
synchronized( this ){
stats_packets_duplicates++;
total_packets_duplicates++;
if ( transmit_unack_packets.size() == 1 ){
UDPPacket packet = (UDPPacket)transmit_unack_packets.get(0);
if ( !packet.isAutoRetransmit()){
if ( total_tick_count - packet.getSendTickCount() >= MIN_RETRANSMIT_TICKS ){
if ( manager.trace() ){
trace( "Retrans non-auto-retrans packet" );
}
packet_to_send = packet;
}
}
}
}
if ( packet_to_send != null ){
send( packet_to_send );
}
return;
}
if ( !out_seq_generator.isValidAlterativeSequence( alt )){
if ( manager.trace() ){
trace( "Received invalid alternative sequence " + alt + " - dropping packet" );
}
return;
}
boolean oop = false;
for (int i=0;i<receive_out_of_order_packets.size();i++){
Object[] entry = (Object[])receive_out_of_order_packets.get(i);
Integer oop_seq = (Integer)entry[0];
ByteBuffer oop_buffer = (ByteBuffer)entry[2];
if ( oop_seq.equals( seq2 )){
synchronized( this ){
if ( oop_buffer != null ){
stats_packets_duplicates++;
total_packets_duplicates++;
if ( manager.trace() ){
trace( "Duplicate out-of-order packet: " + seq2 );
}
return;
}
stats_packets_unique_received++;
total_packets_unique_received++;
if ( manager.trace() ){
trace( "Out-of-order packet entry data matched for seq " + seq2 );
}
// got data matching out-of-order-entry, add it in!
entry[2] = initial_buffer;
oop = true;
break;
}
}
}
if ( !oop ){
// not a known out-of-order packet. If our oop list is full then all we can do is drop
// the packet
boolean added = false;
while ( receive_out_of_order_packets.size() < RECEIVE_OUT_OF_ORDER_PACKETS_MAX ){
int[] seq_in = in_seq_generator.getNextSequenceNumber();
if ( seq2.intValue() == seq_in[1] ){
synchronized( this ){
stats_packets_unique_received++;
total_packets_unique_received++;
}
if ( receive_out_of_order_packets.size() == 0 ){
// this is an in-order packet :)
}else{
if ( manager.trace() ){
trace( "Out-of-order packet entry adding for seq " + seq_in[1] );
}
}
receive_out_of_order_packets.add( new Object[]{ seq2, new Integer( seq_in[3]), initial_buffer } );
added = true;
break;
}else{
if ( manager.trace() ){
trace( "Out-of-order packet: adding spacer for seq " + seq_in[1] );
}
receive_out_of_order_packets.add( new Object[]{ new Integer( seq_in[1]), new Integer( seq_in[3]), null } );
}
}
if ( !added ){
// drop the packet, we have no room to store it
if ( manager.trace() ){
trace( "Out-of-order packet dropped as too many pending" );
}
return;
}
}
boolean this_is_oop = true;
// process any ready packets
Iterator it = receive_out_of_order_packets.iterator();
while( it.hasNext()){
Object[] entry = (Object[])it.next();
ByteBuffer buffer = (ByteBuffer)entry[2];
if ( buffer == null ){
break;
}
it.remove();
byte[] data = buffer.array();
if ( buffer == initial_buffer ){
this_is_oop = false;
}
synchronized( this ){
current_receive_unack_in_sequence_count++;
}
Integer seq = (Integer)entry[0];
receive_last_inorder_sequence = seq.intValue();
receive_last_inorder_alt_sequence = ((Integer)entry[1]).intValue();
if ( !receive_done_sequences.contains( seq )){
receive_done_sequences.addFirst( seq );
if ( receive_done_sequences.size() > RECEIVE_DONE_SEQ_MAX ){
receive_done_sequences.removeLast();
}
}
header_cipher_in.processBytes( data, 12, 2, data, 12 );
int header_len = buffer.getShort()&0xffff;
if ( header_len > data.length ){
if ( manager.trace() ){
trace( "Header length too large" );
}
return;
}
header_cipher_in.processBytes( data, 14, header_len-14, data, 14 );
SHA1Hasher hasher = new SHA1Hasher();
hasher.update( data, 4, 4 );
hasher.update( data, 12, header_len - 4 - 12 );
byte[] hash = hasher.getDigest();
for (int i=0;i<4;i++){
if ( hash[i] != data[header_len-4+i] ){
if ( manager.trace() ){
trace( "hash incorrect" );
}
return;
}
}
byte version = buffer.get();
if ( version != UDPPacket.PROTOCOL_VERSION ){
// continue, assumption is that version changes are backward compatible
// throw( new IOException( "Invalid protocol version '" + version + "'" ));
}
byte flags = buffer.get();
if ( ( flags & UDPPacket.FLAG_LAZY_ACK ) != 0 ){
lazy_ack_found = true;
}
int their_timer_base = (buffer.getShort()&0xffff)*10;
receiveTimerBase( their_timer_base );
byte command = buffer.get();
if ( command == UDPPacket.COMMAND_DATA ){
receiveDataCommand( seq.intValue(), buffer, header_len );
}else if ( command == UDPPacket.COMMAND_ACK ){
receiveAckCommand( buffer );
}else if ( command == UDPPacket.COMMAND_CLOSE ){
receiveCloseCommand( buffer );
}else if ( command == UDPPacket.COMMAND_STAT_REQUEST ){
receiveStatsRequest( buffer );
}else if ( command == UDPPacket.COMMAND_STAT_REPLY ){
receiveStatsReply( buffer );
}else{
// ignore unrecognised commands to support future change
}
}
if ( this_is_oop ){
synchronized( this ){
current_receive_out_of_order_count++;
total_packets_out_of_order++;
}
}
}finally{
boolean send_ack = false;
synchronized( this ){
long unack_diff = current_receive_unack_in_sequence_count - sent_receive_unack_in_sequence_count;
long oos_diff = current_receive_out_of_order_count - sent_receive_out_of_order_count;
if ( unack_diff > RECEIVE_UNACK_IN_SEQUENCE_LIMIT ||
oos_diff > RECEIVE_OUT_OF_ORDER_ACK_LIMIT ){
send_ack = true;
}
}
if ( send_ack ){
sendAckCommand( false );
}
synchronized( this ){
// if we have either received in-order packets that we haven't sent an ack for or
// out-of-order packets start the ack timer
// only exception is if we have only a lazy-ack packet outstanding and no out-of-order
long unack_diff = current_receive_unack_in_sequence_count - sent_receive_unack_in_sequence_count;
if ( unack_diff == 1 && lazy_ack_found && receive_out_of_order_packets.size() == 0 ){
if ( manager.trace() ){
trace( "Not starting ack timer, only lazy ack received" );
}
startKeepAliveTimer();
}else{
stopKeepAliveTimer();
if ( unack_diff > 0 || receive_out_of_order_packets.size() > 0 ){
if ( explicitack_ticks == 0 ){
explicitack_ticks = getExplicitAckTicks();
}
}
}
}
if ( write_select ){
synchronized( connection_writers ){
Iterator it = connection_writers.iterator();
while( it.hasNext()){
UDPConnection c = (UDPConnection)it.next();
if ( c.isConnected()){
// we can safely do this while holding monitor as this simply queues an async selector notification if required
c.sent();
}else{
it.remove();
}
}
}
}
}
}
}
protected int
sendCrypto(
ByteBuffer[] buffers,
int offset,
int length )
throws IOException
{
// regardless of mss we have to get the first phe handshake messages into a single packet
int payload_to_send = 0;
for (int i=offset;i<offset+length;i++){
payload_to_send += buffers[i].remaining();
}
// first packet, cram it all in
byte[] packet_bytes = new byte[ payload_to_send ];
ByteBuffer packet_buffer = ByteBuffer.wrap( packet_bytes );
for (int i=offset;i<offset+length;i++){
packet_buffer.put( buffers[i] );
}
UDPPacket packet_to_send = new UDPPacket( lead_connection, new int[]{ -1, -1, -1, -1 }, UDPPacket.COMMAND_CRYPTO, packet_bytes, 0 );
synchronized( this ){
stats_packets_unique_sent++;
total_packets_unique_sent++;
transmit_unack_packets.add( packet_to_send );
}
if ( manager.trace() ){
trace( "sendCrypto: seq=" + packet_to_send.getSequence() + ", len=" + payload_to_send );
}
send( packet_to_send );
return( payload_to_send );
}
protected void
receiveCrypto(
ByteBuffer buffer )
throws IOException
{
boolean new_connection = false;
UDPConnection connection = null;
synchronized( connections ){
if ( failed ){
throw( new IOException( "Connection set has failed" ));
}
if ( connections.size() == 0 ){
// -1 for connection id as we don't know what it is yet
connection = new UDPConnection( this, -1 );
connections.put( new Integer( connection.getID()), connection );
lead_connection = connection;
new_connection = true;
}else{
connection = lead_connection;
}
}
if ( new_connection ){
manager.accept( local_port, remote_address, connection );
}
if ( manager.trace() ){
trace( connection, "readCrypto: rem="+ buffer.remaining());
}
connection.receive( buffer );
}
protected int
sendDataCommand(
UDPConnection connection,
ByteBuffer[] buffers,
int offset,
int length )
throws IOException
{
int payload_to_send = 0;
for (int i=offset;i<offset+length;i++){
payload_to_send += buffers[i].remaining();
}
byte[] header = new byte[256];
ByteBuffer header_buffer = ByteBuffer.wrap( header );
UDPPacket packet_to_send;
synchronized( this ){
long unack_in_sequence_count = current_receive_unack_in_sequence_count;
int[] sequence_numbers = writeHeaderStart( header_buffer, UDPPacket.COMMAND_DATA, UDPPacket.FLAG_NONE );
header_buffer.putInt( connection.getID());
int header_size = writeHeaderEnd( header_buffer, false );
// we get to add the header back in here to give the total packet size available
int mss = connection.getTransport().getMss() + UDPNetworkManager.PROTOCOL_HEADER_SIZE;
// just in case we have some crazy limit set
if ( mss < MIN_MSS ){
mss = MIN_MSS;
}
if ( payload_to_send > mss - header_size ){
payload_to_send = mss - header_size;
}
if ( payload_to_send < 0 ){
payload_to_send = 0;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -