📄 httpnetworkconnection.java
字号:
/*
* Created on 3 Oct 2006
* Created by Paul Gardner
* Copyright (C) 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 63.529,40 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.networkmanager.impl.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import org.gudy.azureus2.core3.config.COConfigurationManager;
import org.gudy.azureus2.core3.config.ParameterListener;
import org.gudy.azureus2.core3.logging.LogEvent;
import org.gudy.azureus2.core3.logging.LogIDs;
import org.gudy.azureus2.core3.logging.Logger;
import org.gudy.azureus2.core3.peer.impl.PEPeerControl;
import org.gudy.azureus2.core3.peer.impl.PEPeerTransport;
import org.gudy.azureus2.core3.peer.util.PeerUtils;
import org.gudy.azureus2.core3.util.Constants;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.DirectByteBuffer;
import org.gudy.azureus2.core3.util.SimpleTimer;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.core3.util.TimerEvent;
import org.gudy.azureus2.core3.util.TimerEventPerformer;
import com.aelitis.azureus.core.networkmanager.NetworkConnection;
import com.aelitis.azureus.core.networkmanager.OutgoingMessageQueue;
import com.aelitis.azureus.core.networkmanager.RawMessage;
import com.aelitis.azureus.core.networkmanager.impl.RawMessageImpl;
import com.aelitis.azureus.core.peermanager.messaging.Message;
import com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTBitfield;
import com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTHandshake;
import com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTHave;
import com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTInterested;
import com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTPiece;
import com.aelitis.azureus.core.peermanager.messaging.bittorrent.BTRequest;
public abstract class
HTTPNetworkConnection
{
protected static final LogIDs LOGID = LogIDs.NWMAN;
private static final int MAX_OUTSTANDING_BT_REQUESTS = 16;
protected static final String NL = "\r\n";
private static int max_read_block_size;
static{
ParameterListener param_listener = new ParameterListener() {
public void
parameterChanged(
String str )
{
max_read_block_size = COConfigurationManager.getIntParameter( "BT Request Max Block Size" );
}
};
COConfigurationManager.addAndFireParameterListener( "BT Request Max Block Size", param_listener);
}
private static final int TIMEOUT_CHECK_PERIOD = 15*1000;
private static final int DEAD_CONNECTION_TIMEOUT_PERIOD = 30*1000;
private static final int MAX_CON_PER_ENDPOINT = 5*1000;
private static Map http_connection_map = new HashMap();
static{
SimpleTimer.addPeriodicEvent(
"HTTPNetworkConnection:timer",
TIMEOUT_CHECK_PERIOD,
new TimerEventPerformer()
{
public void
perform(
TimerEvent event )
{
synchronized( http_connection_map ){
boolean check = true;
while( check ){
check = false;
Iterator it = http_connection_map.entrySet().iterator();
while( it.hasNext()){
Map.Entry entry = (Map.Entry)it.next();
networkConnectionKey key = (networkConnectionKey)entry.getKey();
List connections = (List)entry.getValue();
/*
String times = "";
for (int i=0;i<connections.size();i++){
HTTPNetworkConnection connection = (HTTPNetworkConnection)connections.get(i);
times += (i==0?"":",") + connection.getTimeSinceLastActivity();
}
System.out.println( "HTTPNC: " + key.getName() + " -> " + connections.size() + " - " + times );
*/
if ( checkConnections( connections )){
// might have a concurrent mod to the iterator
if ( !http_connection_map.containsKey( key )){
check = true;
break;
}
}
}
}
}
}
});
}
protected static boolean
checkConnections(
List connections )
{
boolean some_closed = false;
HTTPNetworkConnection oldest = null;
long oldest_time = -1;
Iterator it = connections.iterator();
List timed_out = new ArrayList();
while( it.hasNext()){
HTTPNetworkConnection connection = (HTTPNetworkConnection)it.next();
long time = connection.getTimeSinceLastActivity();
if ( time > DEAD_CONNECTION_TIMEOUT_PERIOD ){
if ( connection.getRequestCount() == 0 ){
timed_out.add( connection );
continue;
}
}
if ( time > oldest_time && !connection.isClosing()){
oldest_time = time;
oldest = connection;
}
}
for (int i=0;i<timed_out.size();i++){
((HTTPNetworkConnection)timed_out.get(i)).close( "Timeout" );
some_closed = true;
}
if ( connections.size() - timed_out.size() > MAX_CON_PER_ENDPOINT ){
oldest.close( "Too many connections from initiator");
some_closed = true;
}
return( some_closed );
}
private HTTPNetworkManager manager;
private NetworkConnection connection;
private PEPeerTransport peer;
private String url;
private HTTPMessageDecoder decoder;
private HTTPMessageEncoder encoder;
private boolean sent_handshake = false;
private byte[] peer_id = PeerUtils.createPeerID();
private boolean choked = true;
private List http_requests = new ArrayList();
private List choked_requests = new ArrayList();
private List outstanding_requests = new ArrayList();
private BitSet piece_map = new BitSet();
private long last_http_activity_time;
private networkConnectionKey network_connection_key;
private boolean closing;
private boolean destroyed;
protected
HTTPNetworkConnection(
HTTPNetworkManager _manager,
NetworkConnection _connection,
PEPeerTransport _peer,
String _url )
{
manager = _manager;
connection = _connection;
peer = _peer;
url = _url;
network_connection_key = new networkConnectionKey();
last_http_activity_time = SystemTime.getCurrentTime();
decoder = (HTTPMessageDecoder)connection.getIncomingMessageQueue().getDecoder();
encoder = (HTTPMessageEncoder)connection.getOutgoingMessageQueue().getEncoder();
synchronized( http_connection_map ){
List connections = (List)http_connection_map.get( network_connection_key );
if ( connections == null ){
connections = new ArrayList();
http_connection_map.put( network_connection_key, connections );
}
connections.add( this );
if ( connections.size() > MAX_CON_PER_ENDPOINT ){
checkConnections( connections );
}
}
decoder.setConnection( this );
encoder.setConnection( this );
}
protected boolean
isSeed()
{
if ( !peer.getControl().isSeeding()){
if (Logger.isEnabled()){
Logger.log(new LogEvent(peer,LOGID, "Download is not seeding" ));
}
sendAndClose( manager.getNotFound());
return( false );
}
return( true );
}
protected HTTPNetworkManager
getManager()
{
return( manager );
}
protected NetworkConnection
getConnection()
{
return( connection );
}
protected PEPeerTransport
getPeer()
{
return( peer );
}
protected PEPeerControl
getPeerControl()
{
return( peer.getControl());
}
protected RawMessage
encodeChoke()
{
synchronized( outstanding_requests ){
choked = true;
}
return( null );
}
protected RawMessage
encodeUnchoke()
{
synchronized( outstanding_requests ){
choked = false;
for (int i=0;i<choked_requests.size();i++){
decoder.addMessage((BTRequest)choked_requests.get(i));
}
choked_requests.clear();
}
return( null );
}
protected RawMessage
encodeBitField()
{
decoder.addMessage( new BTInterested());
return( null );
}
protected void
readWakeup()
{
connection.getTransport().setReadyForRead();
}
protected RawMessage
encodeHandShake(
Message message )
{
return( null );
}
protected abstract void
decodeHeader(
String header )
throws IOException;
protected String
encodeHeader(
httpRequest request )
{
String res =
"HTTP/1.1 " + (request.isPartialContent()?"206 Partial Content":"200 OK" ) + NL +
"Content-Type: application/octet-stream" + NL +
"Server: " + Constants.AZUREUS_NAME + " " + Constants.AZUREUS_VERSION + NL +
"Connection: " + ( request.keepAlive()?"Keep-Alive":"Close" ) + NL +
(request.keepAlive()?("Keep-Alive: timeout=30" + NL) :"" ) +
"Content-Length: " + request.getTotalLength() + NL +
NL;
return( res );
}
protected void
addRequest(
httpRequest request )
throws IOException
{
last_http_activity_time = SystemTime.getCurrentTime();
PEPeerControl control = getPeerControl();
if ( !sent_handshake ){
sent_handshake = true;
decoder.addMessage( new BTHandshake( control.getHash(), peer_id, false ));
byte[] bits = new byte[(control.getPieces().length +7) /8];
DirectByteBuffer buffer = new DirectByteBuffer( ByteBuffer.wrap( bits ));
decoder.addMessage( new BTBitfield( buffer ));
}
synchronized( outstanding_requests ){
http_requests.add( request );
}
submitBTRequests();
}
protected void
submitBTRequests()
throws IOException
{
PEPeerControl control = getPeerControl();
long piece_size = control.getPieceLength(0);
synchronized( outstanding_requests ){
while( outstanding_requests.size() < MAX_OUTSTANDING_BT_REQUESTS && http_requests.size() > 0 ){
httpRequest http_request = (httpRequest)http_requests.get(0);
long[] offsets = http_request.getOffsets();
long[] lengths = http_request.getLengths();
int index = http_request.getIndex();
long offset = offsets[index];
long length = lengths[index];
int this_piece_number = (int)(offset / piece_size);
int this_piece_size = control.getPieceLength( this_piece_number );
int offset_in_piece = (int)( offset - ( this_piece_number * piece_size ));
int space_this_piece = this_piece_size - offset_in_piece;
int request_size = (int)Math.min( length, space_this_piece );
request_size = Math.min( request_size, max_read_block_size );
addBTRequest(
new BTRequest(
this_piece_number,
offset_in_piece,
request_size ),
http_request );
if ( request_size == length ){
if ( index == offsets.length - 1 ){
http_requests.remove(0);
}else{
http_request.setIndex( index+1 );
}
}else{
offsets[index] += request_size;
lengths[index] -= request_size;
}
}
}
}
protected void
addBTRequest(
BTRequest request,
httpRequest http_request )
throws IOException
{
synchronized( outstanding_requests ){
if ( destroyed ){
throw( new IOException( "HTTP connection destroyed" ));
}
outstanding_requests.add( new pendingRequest( request, http_request ));
if ( choked ){
if ( choked_requests.size() > 1024 ){
Debug.out( "pending request limit exceeded" );
}else{
choked_requests.add( request );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -