📄 virtualchannelselectorimpl.java
字号:
/*
* Created on Jul 28, 2004
* Created by Alon Rohter
* Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.networkmanager.impl.tcp;
import java.nio.channels.*;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.util.*;
import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.*;
import com.aelitis.azureus.core.networkmanager.VirtualChannelSelector;
/**
* Provides a simplified and safe (selectable-channel) socket single-op selector.
*/
public class VirtualChannelSelectorImpl {
private static final LogIDs LOGID = LogIDs.NWMAN;
/*
static boolean rm_trace = false;
static boolean rm_test_fix = false;
static{
COConfigurationManager.addAndFireParameterListeners(
new String[]{ "user.rm.trace", "user.rm.testfix" },
new ParameterListener()
{
public void
parameterChanged(
String parameterName)
{
rm_trace = COConfigurationManager.getBooleanParameter( "user.rm.trace", false );
rm_test_fix = COConfigurationManager.getBooleanParameter( "user.rm.testfix", false );
}
});
}
private long rm_flag_last_log;
private Map rm_listener_map = new HashMap();
*/
protected Selector selector;
private final SelectorGuard selector_guard;
private final LinkedList register_cancel_list = new LinkedList();
private final AEMonitor register_cancel_list_mon = new AEMonitor( "VirtualChannelSelector:RCL");
private final HashMap paused_states = new HashMap();
private final int INTEREST_OP;
private final boolean pause_after_select;
protected final VirtualChannelSelector parent;
//private int[] select_counts = new int[ 50 ];
//private int round = 0;
private volatile boolean destroyed;
private static final int WRITE_SELECTOR_DEBUG_CHECK_PERIOD = 10000;
private static final int WRITE_SELECTOR_DEBUG_MAX_TIME = 20000;
private long last_write_select_debug;
public VirtualChannelSelectorImpl( VirtualChannelSelector _parent, int _interest_op, boolean _pause_after_select ) {
this.parent = _parent;
INTEREST_OP = _interest_op;
pause_after_select = _pause_after_select;
String type;
switch( INTEREST_OP ) {
case VirtualChannelSelector.OP_CONNECT:
type = "OP_CONNECT"; break;
case VirtualChannelSelector.OP_READ:
type = "OP_READ"; break;
default:
type = "OP_WRITE"; break;
}
selector_guard = new SelectorGuard( type, new SelectorGuard.GuardListener() {
public boolean safeModeSelectEnabled() {
return parent.isSafeSelectionModeEnabled();
}
public void spinDetected() {
closeExistingSelector();
try { Thread.sleep( 1000 ); }catch( Throwable x ) {x.printStackTrace();}
parent.enableSafeSelectionMode();
}
public void failureDetected() {
try { Thread.sleep( 10000 ); }catch( Throwable x ) {x.printStackTrace();}
closeExistingSelector();
try { Thread.sleep( 1000 ); }catch( Throwable x ) {x.printStackTrace();}
selector = openNewSelector();
}
});
selector = openNewSelector();
}
protected Selector openNewSelector() {
Selector sel = null;
try {
sel = Selector.open();
AEDiagnostics.logWithStack( "seltrace", "Selector created for '" + parent.getName() + "'," + selector_guard.getType());
}
catch (Throwable t) {
Debug.out( "ERROR: caught exception on Selector.open()", t );
try { Thread.sleep( 3000 ); }catch( Throwable x ) {x.printStackTrace();}
int fail_count = 1;
while( fail_count < 10 ) {
try {
sel = Selector.open();
AEDiagnostics.logWithStack( "seltrace", "Selector created for '" + parent.getName() + "'," + selector_guard.getType());
break;
}
catch( Throwable f ) {
Debug.out( f );
fail_count++;
try { Thread.sleep( 3000 ); }catch( Throwable x ) {x.printStackTrace();}
}
}
if( fail_count < 10 ) { //success !
Debug.out( "NOTICE: socket Selector successfully opened after " +fail_count+ " failures." );
}
else { //failure
Logger.log(new LogAlert(LogAlert.REPEATABLE, LogAlert.AT_ERROR,
"ERROR: socket Selector.open() failed 10 times in a row, aborting."
+ "\nAzureus / Java is likely being firewalled!"));
}
}
return sel;
}
public void pauseSelects( AbstractSelectableChannel channel ) {
//System.out.println( "pauseSelects: " + channel + " - " + Debug.getCompressedStackTrace() );
if( channel == null ) {
return;
}
SelectionKey key = channel.keyFor( selector );
if( key != null && key.isValid() ) {
key.interestOps( key.interestOps() & ~INTEREST_OP );
}
else { //channel not (yet?) registered
if( channel.isOpen() ) { //only bother if channel has not already been closed
try{ register_cancel_list_mon.enter();
paused_states.put( channel, new Boolean( true ) ); //ensure the op is paused upon reg select-time reg
}
finally{ register_cancel_list_mon.exit(); }
}
}
}
public void resumeSelects( AbstractSelectableChannel channel ) {
//System.out.println( "resumeSelects: " + channel + " - " + Debug.getCompressedStackTrace() );
if( channel == null ) {
Debug.printStackTrace( new Exception( "resumeSelects():: channel == null" ) );
return;
}
SelectionKey key = channel.keyFor( selector );
if( key != null && key.isValid() ) {
// if we're resuming a non-interested key then reset the metrics
if (( key.interestOps() & INTEREST_OP ) == 0 ){
RegistrationData data = (RegistrationData)key.attachment();
data.last_select_success_time = SystemTime.getCurrentTime();
data.non_progress_count = 0;
}
key.interestOps( key.interestOps() | INTEREST_OP );
}
else { //channel not (yet?) registered
try{ register_cancel_list_mon.enter();
paused_states.remove( channel ); //check if the channel's op has been already paused before select-time reg
}
finally{ register_cancel_list_mon.exit(); }
}
//try{
// selector.wakeup();
//}
//catch( Throwable t ) { Debug.out( "selector.wakeup():: caught exception: ", t ); }
}
public void
cancel(
AbstractSelectableChannel channel )
{
//System.out.println( "cancel: " + channel + " - " + Debug.getCompressedStackTrace() );
if ( destroyed ){
// don't worry too much about cancels
}
if ( channel == null ){
Debug.out( "Attempt to cancel selects for null channel" );
return;
}
try{
register_cancel_list_mon.enter();
// ensure that there's only one operation outstanding for a given channel
// at any one time (the latest operation requested )
for (Iterator it = register_cancel_list.iterator();it.hasNext();){
Object obj = it.next();
if ( channel == obj ||
( obj instanceof RegistrationData &&
((RegistrationData)obj).channel == channel )){
// remove existing cancel or register
it.remove();
break;
}
}
pauseSelects((AbstractSelectableChannel)channel );
register_cancel_list.add( channel );
}finally{
register_cancel_list_mon.exit();
}
}
public void
register(
AbstractSelectableChannel channel,
VirtualChannelSelector.VirtualAbstractSelectorListener listener,
Object attachment )
{
if ( destroyed ){
Debug.out( "register called after selector destroyed" );
}
if ( channel == null ){
Debug.out( "Attempt to register selects for null channel" );
return;
}
try{
register_cancel_list_mon.enter();
// ensure that there's only one operation outstanding for a given channel
// at any one time (the latest operation requested )
for (Iterator it = register_cancel_list.iterator();it.hasNext();){
Object obj = it.next();
if ( channel == obj ||
( obj instanceof RegistrationData &&
((RegistrationData)obj).channel == channel )){
it.remove();
break;
}
}
paused_states.remove( channel );
register_cancel_list.add( new RegistrationData( channel, listener, attachment ));
}finally{
register_cancel_list_mon.exit();
}
}
public int select( long timeout ) {
long select_start_time = SystemTime.getCurrentTime();
if( selector == null ) {
Debug.out( "VirtualChannelSelector.select() op called with null selector" );
try { Thread.sleep( 3000 ); }catch( Throwable x ) {x.printStackTrace();}
return 0;
}
if( !selector.isOpen()) {
Debug.out( "VirtualChannelSelector.select() op called with closed selector" );
try { Thread.sleep( 3000 ); }catch( Throwable x ) {x.printStackTrace();}
return 0;
}
// store these when they occur so they can be raised *outside* of the monitor to avoid
// potential deadlocks
RegistrationData select_fail_data = null;
Throwable select_fail_excep = null;
//process cancellations
try {
register_cancel_list_mon.enter();
// don't use an iterator here as it is possible that error notifications to listeners
// can result in the addition of a cancel request.
// Note that this can only happen for registrations, and this *should* only result in
// possibly a cancel being added (i.e. not a further registration), hence this can't
// loop. Also note the approach of removing the entry before processing. This is so
// that the logic used when adding a cancel (the removal of any matching entries) does
// not cause the entry we're processing to be removed
while( register_cancel_list.size() > 0 ){
Object obj = register_cancel_list.remove(0);
if ( obj instanceof AbstractSelectableChannel ){
// process cancellation
AbstractSelectableChannel canceled_channel = (AbstractSelectableChannel)obj;
try{
SelectionKey key = canceled_channel.keyFor( selector );
if( key != null ){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -