⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 udpconnectionset.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
			
				// pull out the alternative last-in-order seq
			
			byte[]	alt_seq = new byte[4];
			
			alt_seq[0]	= initial_data[0];
			alt_seq[1]	= initial_data[1];
			alt_seq[2]	= initial_data[8];
			alt_seq[3]	= initial_data[9];
			
			int	alt = bytesToInt( alt_seq, 0 );
						
			boolean	write_select = remoteLastInSequence( alt );
			
			boolean	lazy_ack_found	= false;

			try{
				initial_buffer.getInt();	// seq1
				
				Integer	seq2 = new Integer( initial_buffer.getInt());
				
				initial_buffer.getInt();	// seq3
					
					// first see if we know about this sequence number already
				
				if ( receive_done_sequences.contains( seq2 )){
					
					if ( manager.trace() ){
						trace( "Duplicate processed packet: " + seq2 );
					}
					
						// if we're gone quiescent and our lazy-ack packet failed to be delivered we end up here with the other end
						// resending their last message. We pick up this delivery failure and resend the packet
					
					UDPPacket	packet_to_send = null;
					
					synchronized( this ){
						
						stats_packets_duplicates++;
						total_packets_duplicates++;
						
						if ( transmit_unack_packets.size() == 1 ){
							
							UDPPacket	packet = (UDPPacket)transmit_unack_packets.get(0);
							
							if ( !packet.isAutoRetransmit()){
								
								if ( total_tick_count - packet.getSendTickCount() >= MIN_RETRANSMIT_TICKS ){
								
									if ( manager.trace() ){
										trace( "Retrans non-auto-retrans packet" );
									}
									
									packet_to_send = packet;
								}
							}
						}
					}
					
					if ( packet_to_send != null ){
						
						send( packet_to_send );
					}
					
					return;
				}
				
				if ( !out_seq_generator.isValidAlterativeSequence( alt )){
						
					if ( manager.trace() ){
						trace( "Received invalid alternative sequence " + alt + " - dropping packet" );
					}
					
					return;
				}

				boolean	oop = false;
				
				for (int i=0;i<receive_out_of_order_packets.size();i++){
					
					Object[]	entry = (Object[])receive_out_of_order_packets.get(i);
					
					Integer	oop_seq = (Integer)entry[0];
					
					ByteBuffer	oop_buffer = (ByteBuffer)entry[2];
					
					if ( oop_seq.equals( seq2 )){
						
						synchronized( this ){

							if ( oop_buffer != null ){
						
								stats_packets_duplicates++;
								total_packets_duplicates++;
								
								if ( manager.trace() ){
									trace( "Duplicate out-of-order packet: " + seq2 );
								}
								
								return;
							}
							
							stats_packets_unique_received++;
							total_packets_unique_received++;
							
							if ( manager.trace() ){
								trace( "Out-of-order packet entry data matched for seq " + seq2 );
							}
							
								// got data matching out-of-order-entry, add it in!
													
							entry[2] = initial_buffer;
							
							oop = true;
							
							break;
						}
					}
				}
				
				if ( !oop ){
					
						// not a known out-of-order packet. If our oop list is full then all we can do is drop
						// the packet
					
					boolean	added = false;
					
					while ( receive_out_of_order_packets.size() < RECEIVE_OUT_OF_ORDER_PACKETS_MAX ){
									
						int[]	seq_in = in_seq_generator.getNextSequenceNumber();
								
						if ( seq2.intValue() == seq_in[1] ){
								
							synchronized( this ){
								
								stats_packets_unique_received++;
								total_packets_unique_received++;
							}
							
							if ( receive_out_of_order_packets.size() == 0 ){

								// this is an in-order packet :)

							}else{
								
								if ( manager.trace() ){
									trace( "Out-of-order packet entry adding for seq " + seq_in[1] );
								}
							}
														
							receive_out_of_order_packets.add( new Object[]{ seq2, new Integer( seq_in[3]), initial_buffer } );
							
							added = true;
							
							break;
							
						}else{
							
							if ( manager.trace() ){
								trace( "Out-of-order packet: adding spacer for seq " + seq_in[1] );
							}
							
							receive_out_of_order_packets.add( new Object[]{ new Integer( seq_in[1]), new Integer( seq_in[3]), null } );
						}
					}
					
					if ( !added ){
					
							// drop the packet, we have no room to store it
						
						if ( manager.trace() ){
							trace( "Out-of-order packet dropped as too many pending" );
						}
						
						return;
					}
				}
				
				boolean	this_is_oop 	= true;
				
					// process any ready packets
				
				Iterator	it = receive_out_of_order_packets.iterator();
				
				while( it.hasNext()){
					
					Object[]	entry = (Object[])it.next();
					
					ByteBuffer	buffer = (ByteBuffer)entry[2];
					
					if ( buffer == null ){
						
						break;
					}
				
					it.remove();
				
					byte[]	data = buffer.array();
					
					if ( buffer == initial_buffer ){
						
						this_is_oop = false;
					}
					
					synchronized( this ){
						
						current_receive_unack_in_sequence_count++;
					}
					
					Integer	seq = (Integer)entry[0];
					
					receive_last_inorder_sequence		= seq.intValue();
					receive_last_inorder_alt_sequence	= ((Integer)entry[1]).intValue();
					
					if ( !receive_done_sequences.contains( seq )){
						
						receive_done_sequences.addFirst( seq );
					
						if ( receive_done_sequences.size() > RECEIVE_DONE_SEQ_MAX ){
							
							receive_done_sequences.removeLast();
						}
					}
					
					header_cipher_in.processBytes( data, 12, 2, data, 12 );
		
					int	header_len = buffer.getShort()&0xffff;
					
					if ( header_len > data.length ){
						
						if ( manager.trace() ){
							trace( "Header length too large" );
						}
						
						return;
					}
					
					header_cipher_in.processBytes( data, 14, header_len-14, data, 14 );
					
					SHA1Hasher	hasher = new SHA1Hasher();
					
					hasher.update( data, 4, 4 );
					hasher.update( data, 12, header_len - 4 - 12 );
									
					byte[]	hash = hasher.getDigest();
							
					for (int i=0;i<4;i++){
						
						if ( hash[i] != data[header_len-4+i] ){	
						
							if ( manager.trace() ){
								trace( "hash incorrect" );
							}
							
							return;
						}
					}
					
					byte	version = buffer.get();
					
					if ( version != UDPPacket.PROTOCOL_VERSION ){
						
						// continue, assumption is that version changes are backward compatible
						
						// throw( new IOException( "Invalid protocol version '" + version + "'" ));
					}
					
					byte	flags = buffer.get();
								
					if ( ( flags & UDPPacket.FLAG_LAZY_ACK ) != 0 ){
						
						lazy_ack_found	= true;
					}
					
					int	their_timer_base = (buffer.getShort()&0xffff)*10;
					
					receiveTimerBase( their_timer_base );
					
					byte	command = buffer.get();
					
					if ( command == UDPPacket.COMMAND_DATA ){				
	
						receiveDataCommand( seq.intValue(), buffer, header_len );
							
					}else if ( command == UDPPacket.COMMAND_ACK ){
						
						receiveAckCommand( buffer );
						
					}else if ( command == UDPPacket.COMMAND_CLOSE ){
						
						receiveCloseCommand( buffer );
	
					}else if ( command == UDPPacket.COMMAND_STAT_REQUEST ){

						receiveStatsRequest( buffer );
						
					}else if ( command == UDPPacket.COMMAND_STAT_REPLY ){

						receiveStatsReply( buffer );
						
					}else{
					
						// ignore unrecognised commands to support future change
					}
				}
				
				if ( this_is_oop ){
					
					synchronized( this ){
					
						current_receive_out_of_order_count++;
						total_packets_out_of_order++;
					}
				}
			}finally{
			
				boolean	send_ack = false;
				
				synchronized( this ){
					
					long	unack_diff 	= current_receive_unack_in_sequence_count  - sent_receive_unack_in_sequence_count;
					long	oos_diff	= current_receive_out_of_order_count - sent_receive_out_of_order_count;
					
					if ( 	unack_diff > RECEIVE_UNACK_IN_SEQUENCE_LIMIT || 
							oos_diff  > RECEIVE_OUT_OF_ORDER_ACK_LIMIT ){
						
						send_ack = true;
					}
				}
				
				if ( send_ack ){
					
					sendAckCommand( false );
				}
					
				synchronized( this ){

						// if we have either received in-order packets that we haven't sent an ack for or
						// out-of-order packets start the ack timer
					
						// only exception is if we have only a lazy-ack packet outstanding and no out-of-order
					
					long	unack_diff 	= current_receive_unack_in_sequence_count  - sent_receive_unack_in_sequence_count;
					
					if ( unack_diff == 1 && lazy_ack_found && receive_out_of_order_packets.size() == 0 ){
						
						if ( manager.trace() ){
							trace( "Not starting ack timer, only lazy ack received" );
						}
						
						startKeepAliveTimer();
						
					}else{
						
						stopKeepAliveTimer();
						
						if ( unack_diff > 0 || receive_out_of_order_packets.size() > 0 ){
							
							if ( explicitack_ticks == 0 ){
								
								explicitack_ticks = getExplicitAckTicks();
							}
						}
					}
				}
			
				if ( write_select ){
					
					synchronized( connection_writers ){
	
						Iterator	it = connection_writers.iterator();
						
						while( it.hasNext()){
							
							UDPConnection	c = (UDPConnection)it.next();
							
							if ( c.isConnected()){
								
									// we can safely do this while holding monitor as this simply queues an async selector notification if required
								
								c.sent();
								
							}else{
								
								it.remove();
							}
						}
					}
				}
			}
		}
	}
	
	
	protected int
	sendCrypto(
		ByteBuffer[]		buffers,
		int					offset,
		int					length )
	
		throws IOException
	{
		// regardless of mss we have to get the first phe handshake messages into a single packet 

		int	payload_to_send	= 0;

		for (int i=offset;i<offset+length;i++){
			
			payload_to_send += buffers[i].remaining();
		}
		
		// first packet, cram it all in
		
		byte[]	packet_bytes = new byte[ payload_to_send ];
		
		ByteBuffer packet_buffer = ByteBuffer.wrap( packet_bytes );
		
		for (int i=offset;i<offset+length;i++){
			
			packet_buffer.put( buffers[i] );
		}
			
		UDPPacket packet_to_send = new UDPPacket( lead_connection, new int[]{ -1, -1, -1, -1 }, UDPPacket.COMMAND_CRYPTO, packet_bytes, 0 );	
		
		synchronized( this ){
			
			stats_packets_unique_sent++;
			total_packets_unique_sent++;
			
			transmit_unack_packets.add( packet_to_send );
		}
		
		if ( manager.trace() ){
			trace( "sendCrypto: seq=" + packet_to_send.getSequence() + ", len=" + payload_to_send );
		}
		
		send( packet_to_send );
	
		return( payload_to_send );
	}
	
	protected void
	receiveCrypto(
		ByteBuffer		buffer )
	
		throws IOException
	{
		boolean	new_connection = false;
		
		UDPConnection	connection = null;
		
		synchronized( connections ){

			if ( failed ){
				
				throw( new IOException( "Connection set has failed" ));
			}
			
			if ( connections.size() == 0 ){
				
					// -1 for connection id as we don't know what it is yet
				
				connection	= new UDPConnection( this, -1 );
		
				connections.put( new Integer( connection.getID()), connection );
					
				lead_connection	= connection;
				
				new_connection	= true;
				
			}else{
				
				connection = lead_connection;
			}
		}
					
		if ( new_connection ){
			
			manager.accept( local_port, remote_address, connection );
		}
			
		if ( manager.trace() ){
			trace( connection, "readCrypto: rem="+ buffer.remaining());
		}
		
		connection.receive( buffer );
	}

	
	protected int
	sendDataCommand(
		UDPConnection		connection,
		ByteBuffer[]		buffers,
		int					offset,
		int					length )
	
		throws IOException
	{
		int	payload_to_send	= 0;

		for (int i=offset;i<offset+length;i++){
			
			payload_to_send += buffers[i].remaining();
		}
		
		byte[]	header = new byte[256];
		
		ByteBuffer	header_buffer = ByteBuffer.wrap( header );
		
		UDPPacket	packet_to_send;
		
		synchronized( this ){
			
			long	unack_in_sequence_count	= current_receive_unack_in_sequence_count;
			
			int[]	sequence_numbers = writeHeaderStart( header_buffer, UDPPacket.COMMAND_DATA, UDPPacket.FLAG_NONE );
			
			header_buffer.putInt( connection.getID());
			
			int header_size = writeHeaderEnd( header_buffer, false );
						
				// we get to add the header back in here to give the total packet size available
			
			int	mss = connection.getTransport().getMss() + UDPNetworkManager.PROTOCOL_HEADER_SIZE;
	
				// just in case we have some crazy limit set
			
			if ( mss < MIN_MSS ){
				
				mss = MIN_MSS;
			}
			
			if ( payload_to_send > mss - header_size ){
				
				payload_to_send	= mss - header_size;
			}
			
			if ( payload_to_send < 0 ){
				
				payload_to_send	= 0;
			}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -