⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 httpnetworkconnection.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * Created on 3 Oct 2006
 * Created by Paul Gardner
 * Copyright (C) 2006 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SAS au capital de 63.529,40 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */

package com.aelitis.azureus.core.networkmanager.impl.http;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;

import org.gudy.azureus2.core3.config.COConfigurationManager;
import org.gudy.azureus2.core3.config.ParameterListener;
import org.gudy.azureus2.core3.logging.LogEvent;
import org.gudy.azureus2.core3.logging.LogIDs;
import org.gudy.azureus2.core3.logging.Logger;
import org.gudy.azureus2.core3.peer.impl.PEPeerControl;
import org.gudy.azureus2.core3.peer.impl.PEPeerTransport;
import org.gudy.azureus2.core3.peer.util.PeerUtils;
import org.gudy.azureus2.core3.util.Constants;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.DirectByteBuffer;
import org.gudy.azureus2.core3.util.SimpleTimer;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.core3.util.TimerEvent;
import org.gudy.azureus2.core3.util.TimerEventPerformer;

import com.aelitis.azureus.core.networkmanager.NetworkConnection;
import com.aelitis.azureus.core.networkmanager.OutgoingMessageQueue;
import com.aelitis.azureus.core.networkmanager.RawMessage;
import com.aelitis.azureus.core.networkmanager.impl.RawMessageImpl;
import com.aelitis.azureus.core.peermanager.messaging.Message;
import com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTBitfield;
import com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTHandshake;
import com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTHave;
import com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTInterested;
import com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTPiece;
import com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTRequest;

public abstract class 
HTTPNetworkConnection 
{
	protected static final LogIDs LOGID = LogIDs.NWMAN;

	private static final int	MAX_OUTSTANDING_BT_REQUESTS	= 16;
	
	protected static final String	NL			= "\r\n";

	private static int        max_read_block_size;

	static{
	
	    ParameterListener param_listener = new ParameterListener() {
	            public void
	            parameterChanged(
	                String  str )
	            {
	                max_read_block_size = COConfigurationManager.getIntParameter( "BT Request Max Block Size" );
	            }
	    };
	
	    COConfigurationManager.addAndFireParameterListener( "BT Request Max Block Size", param_listener);
	}
	
	private static final int	TIMEOUT_CHECK_PERIOD			= 15*1000;
	private static final int	DEAD_CONNECTION_TIMEOUT_PERIOD	= 30*1000;
	private static final int	MAX_CON_PER_ENDPOINT			= 5*1000;

	private static Map	http_connection_map = new HashMap();
	
	static{
		SimpleTimer.addPeriodicEvent(
			"HTTPNetworkConnection:timer",
			TIMEOUT_CHECK_PERIOD,
			new TimerEventPerformer()
			{
				public void 
				perform(
					TimerEvent event ) 
				{
					synchronized( http_connection_map ){
						
						boolean	 check = true;
						
						while( check ){
							
							check = false;
	
							Iterator	it = http_connection_map.entrySet().iterator();
							
							while( it.hasNext()){
								
								Map.Entry	entry = (Map.Entry)it.next();
								
								networkConnectionKey	key = (networkConnectionKey)entry.getKey();
								
								List	connections = (List)entry.getValue();
								
								/*
								String	times = "";
								
								for (int i=0;i<connections.size();i++){
									
									HTTPNetworkConnection	connection = (HTTPNetworkConnection)connections.get(i);
								
									times += (i==0?"":",") + connection.getTimeSinceLastActivity();
								}
								
								System.out.println( "HTTPNC: " + key.getName() + " -> " + connections.size() + " - " + times );
								*/
								
								if ( checkConnections( connections )){
									
										// might have a concurrent mod to the iterator
									
									if ( !http_connection_map.containsKey( key )){
										
										check	= true;
										
										break;
									}
								}
							}
						}
					}
				}
			});
	}
	
	protected static boolean
	checkConnections(
		List	connections )
	{
		boolean	some_closed = false;
				
		HTTPNetworkConnection	oldest 			= null;
		long					oldest_time		= -1;
		
		Iterator	it = connections.iterator();
			
		List	timed_out = new ArrayList();
		
		while( it.hasNext()){
			
			HTTPNetworkConnection	connection = (HTTPNetworkConnection)it.next();
		
			long	time = connection.getTimeSinceLastActivity();
		
			if ( time > DEAD_CONNECTION_TIMEOUT_PERIOD ){
				
				if ( connection.getRequestCount() == 0 ){
																
					timed_out.add( connection );
					
					continue;
				}
			}
						
			if ( time > oldest_time && !connection.isClosing()){
					
				oldest_time		= time;
					
				oldest	= connection;
			}
		}
		
		for (int i=0;i<timed_out.size();i++){
			
			((HTTPNetworkConnection)timed_out.get(i)).close( "Timeout" );
			
			some_closed	= true;
		}
		
		if ( connections.size() - timed_out.size() > MAX_CON_PER_ENDPOINT ){
			
			oldest.close( "Too many connections from initiator");
				
			some_closed	= true;
		}
		
		return( some_closed );
	}
	
	private HTTPNetworkManager	manager;
	private NetworkConnection	connection;
	private PEPeerTransport		peer;
	private String				url;
	
	private HTTPMessageDecoder	decoder;
	private HTTPMessageEncoder	encoder;
	
	private boolean			sent_handshake	= false;
	
	private byte[]	peer_id	= PeerUtils.createPeerID();

	private boolean	choked	= true;
	
	private List	http_requests			= new ArrayList();
	private List	choked_requests 		= new ArrayList();
	private List	outstanding_requests 	= new ArrayList();
	
	private BitSet	piece_map	= new BitSet();
	
	private long	last_http_activity_time;
	
	private networkConnectionKey	network_connection_key;
	
	private boolean	closing;
	private boolean	destroyed;
	
	protected
	HTTPNetworkConnection(
		HTTPNetworkManager		_manager,
		NetworkConnection		_connection,
		PEPeerTransport			_peer,
		String					_url )
	{
		manager		= _manager;
		connection	= _connection;
		peer		= _peer;
		url			= _url;
		
		network_connection_key = new networkConnectionKey();
			
		last_http_activity_time	= SystemTime.getCurrentTime();
		
		decoder	= (HTTPMessageDecoder)connection.getIncomingMessageQueue().getDecoder();
		encoder = (HTTPMessageEncoder)connection.getOutgoingMessageQueue().getEncoder();

		synchronized( http_connection_map ){
						
			List	connections = (List)http_connection_map.get( network_connection_key );
			
			if ( connections == null ){
				
				connections = new ArrayList();
				
				http_connection_map.put( network_connection_key, connections );
			}
			
			connections.add( this );
			
			if ( connections.size() > MAX_CON_PER_ENDPOINT ){
				
				checkConnections( connections );
			}
		}
		
		decoder.setConnection( this );
		encoder.setConnection( this );
	}
	
	protected boolean
	isSeed()
	{
		if ( !peer.getControl().isSeeding()){
			
			if (Logger.isEnabled()){
				Logger.log(new LogEvent(peer,LOGID, "Download is not seeding" ));
			}   	
			
			sendAndClose( manager.getNotFound());
			
			return( false );
		}
		
		return( true );
	}
	
	protected HTTPNetworkManager
	getManager()
	{
		return( manager );
	}
	
	protected NetworkConnection
	getConnection()
	{
		return( connection );
	}
	
	protected PEPeerTransport
	getPeer()
	{
		return( peer );
	}
	protected PEPeerControl
	getPeerControl()
	{
		return( peer.getControl());
	}
	
	protected RawMessage
	encodeChoke()
	{
		synchronized( outstanding_requests ){
			
			choked	= true;
		}
		
		return( null );
	}
	
	protected RawMessage
	encodeUnchoke()
	{		
		synchronized( outstanding_requests ){
			
			choked	= false;
			
			for (int i=0;i<choked_requests.size();i++){
								
				decoder.addMessage((BTRequest)choked_requests.get(i));
			}
			
			choked_requests.clear();
		}
		
		return( null );
	}
	
	protected RawMessage
	encodeBitField()
	{
		decoder.addMessage( new BTInterested());
		
		return( null );
	}
	
	protected void
	readWakeup()
	{
		connection.getTransport().setReadyForRead();
	}

	protected RawMessage
	encodeHandShake(
		Message	message )
	{
		return( null );
	}
	
	protected abstract void
	decodeHeader(
		String		header )
	
		throws IOException;

	protected String
	encodeHeader(
		httpRequest	request )
	{
		String	res = 
			"HTTP/1.1 " + (request.isPartialContent()?"206 Partial Content":"200 OK" ) + NL + 
				"Content-Type: application/octet-stream" + NL +
				"Server: " + Constants.AZUREUS_NAME + " " + Constants.AZUREUS_VERSION + NL +
				"Connection: " + ( request.keepAlive()?"Keep-Alive":"Close" ) + NL +
				(request.keepAlive()?("Keep-Alive: timeout=30" + NL) :"" ) +
				"Content-Length: " + request.getTotalLength() + NL +
				NL;
							
		return( res );
	}
	
	protected void
	addRequest(
		httpRequest		request )
	
		throws IOException
	{
		last_http_activity_time	= SystemTime.getCurrentTime();
		
		PEPeerControl	control = getPeerControl();
		
		if ( !sent_handshake ){
			
			sent_handshake	= true;
			
			decoder.addMessage( new BTHandshake( control.getHash(), peer_id, false ));
			
			byte[]	bits = new byte[(control.getPieces().length +7) /8];
			
			DirectByteBuffer buffer = new DirectByteBuffer( ByteBuffer.wrap( bits ));
			
			decoder.addMessage( new BTBitfield( buffer ));
		}
		
		synchronized( outstanding_requests ){

			http_requests.add( request );
		}
		
		submitBTRequests();
	}
	
	protected void
	submitBTRequests()
	
		throws IOException
	{
		PEPeerControl	control = getPeerControl();

		long	piece_size = control.getPieceLength(0);
	
		synchronized( outstanding_requests ){

			while( outstanding_requests.size() < MAX_OUTSTANDING_BT_REQUESTS && http_requests.size() > 0 ){
				
				httpRequest	http_request = (httpRequest)http_requests.get(0);
				
				long[]	offsets	= http_request.getOffsets();
				long[]	lengths	= http_request.getLengths();
				
				int	index	= http_request.getIndex();
				
				long	offset 	= offsets[index];
				long	length	= lengths[index];
				
				int		this_piece_number 	= (int)(offset / piece_size);
				int		this_piece_size		= control.getPieceLength( this_piece_number );
				
				int		offset_in_piece 	= (int)( offset - ( this_piece_number * piece_size ));
				
				int		space_this_piece 	= this_piece_size - offset_in_piece;
				
				int		request_size = (int)Math.min( length, space_this_piece );
				
				request_size = Math.min( request_size, max_read_block_size );
				
				addBTRequest( 
					new BTRequest( 
							this_piece_number, 
							offset_in_piece, 
							request_size ),
					http_request );
					
				if ( request_size == length ){
					
					if ( index == offsets.length - 1 ){
						
						http_requests.remove(0);
						
					}else{
						
						http_request.setIndex( index+1 );
					}
				}else{
					offsets[index] += request_size;
					lengths[index] -= request_size;
				}
			}
		}
	}
	
	protected void
	addBTRequest(
		BTRequest		request,
		httpRequest		http_request )
	
		throws IOException
	{
		synchronized( outstanding_requests ){
				
			if ( destroyed ){
				
				throw( new IOException( "HTTP connection destroyed" ));
			}
			
			outstanding_requests.add( new pendingRequest( request, http_request ));
			
			if ( choked ){
					
				if ( choked_requests.size() > 1024 ){
					
					Debug.out( "pending request limit exceeded" );
					
				}else{
				
					choked_requests.add( request );

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -