📄 connectdisconnectmanager.java
字号:
/*
* Created on Sep 13, 2004
* Created by Alon Rohter
* Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.networkmanager.impl.tcp;
import java.net.*;
import java.nio.channels.*;
import java.util.*;
import org.gudy.azureus2.core3.config.COConfigurationManager;
import org.gudy.azureus2.core3.config.ParameterListener;
import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.*;
import com.aelitis.azureus.core.networkmanager.VirtualChannelSelector;
import com.aelitis.azureus.core.networkmanager.admin.NetworkAdmin;
import com.aelitis.azureus.core.stats.AzureusCoreStats;
import com.aelitis.azureus.core.stats.AzureusCoreStatsProvider;
/**
* Manages new connection establishment and ended connection termination.
*/
public class ConnectDisconnectManager {
private static final LogIDs LOGID = LogIDs.NWMAN;
private static int MIN_SIMULTANIOUS_CONNECT_ATTEMPTS = 3;
public static int MAX_SIMULTANIOUS_CONNECT_ATTEMPTS = 5; //NOTE: WinXP SP2 limits to 10 max at any given time
static {
MAX_SIMULTANIOUS_CONNECT_ATTEMPTS = COConfigurationManager.getIntParameter( "network.max.simultaneous.connect.attempts" );
if( MAX_SIMULTANIOUS_CONNECT_ATTEMPTS < 1 ) { //should never happen, but hey
MAX_SIMULTANIOUS_CONNECT_ATTEMPTS = 1;
COConfigurationManager.setParameter( "network.max.simultaneous.connect.attempts", 1 );
}
MIN_SIMULTANIOUS_CONNECT_ATTEMPTS = MAX_SIMULTANIOUS_CONNECT_ATTEMPTS - 2;
if( MIN_SIMULTANIOUS_CONNECT_ATTEMPTS < 1 ) {
MIN_SIMULTANIOUS_CONNECT_ATTEMPTS = 1;
}
COConfigurationManager.addParameterListener( "network.max.simultaneous.connect.attempts", new ParameterListener() {
public void parameterChanged( String parameterName ) {
MAX_SIMULTANIOUS_CONNECT_ATTEMPTS = COConfigurationManager.getIntParameter( "network.max.simultaneous.connect.attempts" );
MIN_SIMULTANIOUS_CONNECT_ATTEMPTS = MAX_SIMULTANIOUS_CONNECT_ATTEMPTS - 2;
if( MIN_SIMULTANIOUS_CONNECT_ATTEMPTS < 1 ) {
MIN_SIMULTANIOUS_CONNECT_ATTEMPTS = 1;
}
}
});
}
private static final int CONNECT_ATTEMPT_TIMEOUT = 30*1000; //30sec
private static final int CONNECT_ATTEMPT_STALL_TIME = 3*1000; //3sec
private static final boolean SHOW_CONNECT_STATS = false;
private final VirtualChannelSelector connect_selector = new VirtualChannelSelector( "Connect/Disconnect Manager", VirtualChannelSelector.OP_CONNECT, true );
private final LinkedList new_requests = new LinkedList();
private final ArrayList canceled_requests = new ArrayList();
private final AEMonitor new_canceled_mon= new AEMonitor( "ConnectDisconnectManager:NCM");
private final HashMap pending_attempts = new HashMap();
private final LinkedList pending_closes = new LinkedList();
private final Map delayed_closes = new HashMap();
private final AEMonitor pending_closes_mon = new AEMonitor( "ConnectDisconnectManager:PC");
private final Random random = new Random();
public
ConnectDisconnectManager()
{
Set types = new HashSet();
types.add( AzureusCoreStats.ST_NET_TCP_OUT_CONNECT_QUEUE_LENGTH );
types.add( AzureusCoreStats.ST_NET_TCP_OUT_CANCEL_QUEUE_LENGTH );
types.add( AzureusCoreStats.ST_NET_TCP_OUT_CLOSE_QUEUE_LENGTH );
types.add( AzureusCoreStats.ST_NET_TCP_OUT_PENDING_QUEUE_LENGTH );
AzureusCoreStats.registerProvider(
types,
new AzureusCoreStatsProvider()
{
public void
updateStats(
Set types,
Map values )
{
if ( types.contains( AzureusCoreStats.ST_NET_TCP_OUT_CONNECT_QUEUE_LENGTH )){
values.put( AzureusCoreStats.ST_NET_TCP_OUT_CONNECT_QUEUE_LENGTH, new Long( new_requests.size()));
}
if ( types.contains( AzureusCoreStats.ST_NET_TCP_OUT_CANCEL_QUEUE_LENGTH )){
values.put( AzureusCoreStats.ST_NET_TCP_OUT_CANCEL_QUEUE_LENGTH, new Long( canceled_requests.size()));
}
if ( types.contains( AzureusCoreStats.ST_NET_TCP_OUT_CLOSE_QUEUE_LENGTH )){
values.put( AzureusCoreStats.ST_NET_TCP_OUT_CLOSE_QUEUE_LENGTH, new Long( pending_closes.size()));
}
if ( types.contains( AzureusCoreStats.ST_NET_TCP_OUT_PENDING_QUEUE_LENGTH )){
values.put( AzureusCoreStats.ST_NET_TCP_OUT_PENDING_QUEUE_LENGTH, new Long( pending_attempts.size()));
}
}
});
AEThread loop = new AEThread( "ConnectDisconnectManager" ) {
public void runSupport() {
mainLoop();
}
};
loop.setDaemon( true );
loop.start();
}
private void mainLoop() {
while( true ) {
addNewOutboundRequests();
runSelect();
doClosings();
}
}
private void addNewOutboundRequests() {
while( pending_attempts.size() < MIN_SIMULTANIOUS_CONNECT_ATTEMPTS ) {
ConnectionRequest cr = null;
try{
new_canceled_mon.enter();
if( new_requests.isEmpty() ) break;
cr = (ConnectionRequest)new_requests.removeFirst();
}
finally{
new_canceled_mon.exit();
}
if( cr != null ) {
addNewRequest( cr );
}
}
}
private void addNewRequest( final ConnectionRequest request ) {
request.listener.connectAttemptStarted();
try {
request.channel = SocketChannel.open();
try { //advanced socket options
int rcv_size = COConfigurationManager.getIntParameter( "network.tcp.socket.SO_RCVBUF" );
if( rcv_size > 0 ) {
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, "Setting socket receive buffer size"
+ " for outgoing connection [" + request.address + "] to: "
+ rcv_size));
request.channel.socket().setReceiveBufferSize( rcv_size );
}
int snd_size = COConfigurationManager.getIntParameter( "network.tcp.socket.SO_SNDBUF" );
if( snd_size > 0 ) {
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, "Setting socket send buffer size "
+ "for outgoing connection [" + request.address + "] to: "
+ snd_size));
request.channel.socket().setSendBufferSize( snd_size );
}
String ip_tos = COConfigurationManager.getStringParameter( "network.tcp.socket.IPTOS" );
if( ip_tos.length() > 0 ) {
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, "Setting socket TOS field "
+ "for outgoing connection [" + request.address + "] to: "
+ ip_tos));
request.channel.socket().setTrafficClass( Integer.decode( ip_tos ).intValue() );
}
int local_bind_port = COConfigurationManager.getIntParameter( "network.bind.local.port" );
if( local_bind_port > 0 ) {
request.channel.socket().setReuseAddress( true );
}
InetAddress bindIP = NetworkAdmin.getSingleton().getDefaultBindAddress();
if ( bindIP != null ) {
if (Logger.isEnabled()) Logger.log(new LogEvent(LOGID, "Binding outgoing connection [" + request.address + "] to local IP address: " + bindIP));
request.channel.socket().bind( new InetSocketAddress( bindIP, local_bind_port ) );
}
else if( local_bind_port > 0 ) {
if (Logger.isEnabled()) Logger.log(new LogEvent(LOGID, "Binding outgoing connection [" + request.address + "] to local port #: " +local_bind_port));
request.channel.socket().bind( new InetSocketAddress( local_bind_port ) );
}
}
catch( Throwable t ) {
String msg = "Error while processing advanced socket options.";
Debug.out( msg, t );
Logger.log(new LogAlert(LogAlert.UNREPEATABLE, msg, t));
//dont pass the exception outwards, so we will continue processing connection without advanced options set
}
request.channel.configureBlocking( false );
request.connect_start_time = SystemTime.getCurrentTime();
if( request.channel.connect( request.address ) ) { //already connected
finishConnect( request );
}
else { //not yet connected, so register for connect selection
pending_attempts.put( request, null );
connect_selector.register( request.channel, new VirtualChannelSelector.VirtualSelectorListener() {
public boolean selectSuccess( VirtualChannelSelector selector, SocketChannel sc, Object attachment ) {
pending_attempts.remove( request );
finishConnect( request );
return true;
}
public void selectFailure( VirtualChannelSelector selector, SocketChannel sc,Object attachment, Throwable msg ) {
pending_attempts.remove( request );
closeConnection( request.channel );
request.listener.connectFailure( msg );
}
}, null );
}
}
catch( Throwable t ) {
String full = request.address.toString();
String hostname = request.address.getHostName();
int port = request.address.getPort();
boolean unresolved = request.address.isUnresolved();
InetAddress inet_address = request.address.getAddress();
String full_sub = inet_address==null?request.address.toString():inet_address.toString();
String host_address = inet_address==null?request.address.toString():inet_address.getHostAddress();
String msg = "ConnectDisconnectManager::address exception: full="+full+ ", hostname="+hostname+ ", port="+port+ ", unresolved="+unresolved+ ", full_sub="+full_sub+ ", host_address="+host_address;
if( request.channel != null ) {
String channel = request.channel.toString();
String socket = request.channel.socket().toString();
String local_address = request.channel.socket().getLocalAddress().toString();
int local_port = request.channel.socket().getLocalPort();
SocketAddress ra = request.channel.socket().getRemoteSocketAddress();
String remote_address;
if( ra != null ) remote_address = ra.toString();
else remote_address = "<null>";
int remote_port = request.channel.socket().getPort();
msg += "\n channel="+channel+ ", socket="+socket+ ", local_address="+local_address+ ", local_port="+local_port+ ", remote_address="+remote_address+ ", remote_port="+remote_port;
}
else {
msg += "\n channel=<null>";
}
if ( t instanceof UnresolvedAddressException ){
Debug.outNoStack( msg );
}else{
Debug.out( msg, t );
}
if( request.channel != null ) {
closeConnection( request.channel );
}
request.listener.connectFailure( t );
}
}
private void finishConnect( ConnectionRequest request ) {
try {
if( request.channel.finishConnect() ) {
if( SHOW_CONNECT_STATS ) {
long queue_wait_time = request.connect_start_time - request.request_start_time;
long connect_time = SystemTime.getCurrentTime() - request.connect_start_time;
int num_queued = new_requests.size();
int num_connecting = pending_attempts.size();
System.out.println("S: queue_wait_time="+queue_wait_time+
", connect_time="+connect_time+
", num_queued="+num_queued+
", num_connecting="+num_connecting);
}
//ensure the request hasn't been canceled during the select op
boolean canceled = false;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -