📄 virtualchannelselector.java
字号:
/*
* Created on Jun 5, 2005
* Created by Alon Rohter
* Copyright (C) 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.core.networkmanager;
import java.nio.channels.*;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.util.*;
import org.gudy.azureus2.core3.config.COConfigurationManager;
import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.Debug;
import com.aelitis.azureus.core.networkmanager.impl.tcp.VirtualChannelSelectorImpl;
public class VirtualChannelSelector {
private static final LogIDs LOGID = LogIDs.NWMAN;
public static final int OP_ACCEPT = SelectionKey.OP_ACCEPT;
public static final int OP_CONNECT = SelectionKey.OP_CONNECT;
public static final int OP_READ = SelectionKey.OP_READ;
public static final int OP_WRITE = SelectionKey.OP_WRITE;
private boolean SAFE_SELECTOR_MODE_ENABLED = TEST_SAFE_MODE || COConfigurationManager.getBooleanParameter( "network.tcp.enable_safe_selector_mode" );
private static final boolean TEST_SAFE_MODE = false;
private static final int MAX_CHANNELS_PER_SAFE_SELECTOR = COConfigurationManager.getIntParameter( "network.tcp.safe_selector_mode.chunk_size" );
private static final int MAX_SAFEMODE_SELECTORS = 20000 / MAX_CHANNELS_PER_SAFE_SELECTOR;
private String name;
private VirtualChannelSelectorImpl selector_impl;
private volatile boolean destroyed;
//ONLY USED IN FAULTY MODE
private HashMap selectors;
private HashSet selectors_keyset_cow;
private AEMonitor selectors_mon;
private final int op;
private final boolean pause;
/**
* Create a new virtual selectable-channel selector, selecting over the given interest-op.
* @param interest_op operation set of OP_CONNECT, OP_ACCEPT, OP_READ, or OP_WRITE
* @param pause_after_select whether or not to auto-disable interest op after select
*/
public VirtualChannelSelector( String name, int interest_op, boolean pause_after_select ) {
this.name = name;
this.op = interest_op;
this.pause = pause_after_select;
if( SAFE_SELECTOR_MODE_ENABLED ) {
initSafeMode();
}
else {
selector_impl = new VirtualChannelSelectorImpl( this, op, pause );
selectors = null;
selectors_keyset_cow = null;
selectors_mon = null;
}
}
public String
getName()
{
return( name );
}
private void initSafeMode() {
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID,
"*** SAFE SOCKET SELECTOR MODE ENABLED ***"));
selector_impl = null;
selectors = new HashMap();
selectors_mon = new AEMonitor( "VirtualChannelSelector:FM" );
selectors.put( new VirtualChannelSelectorImpl( this, op, pause ), new ArrayList() );
selectors_keyset_cow = new HashSet( selectors.keySet());
}
public void register( SocketChannel channel, VirtualSelectorListener listener, Object attachment ) {
registerSupport( channel, listener, attachment );
}
public void register( ServerSocketChannel channel, VirtualAcceptSelectorListener listener, Object attachment ) {
registerSupport( channel, listener, attachment );
}
/**
* Register the given selectable channel, using the given listener for notification
* of completed select operations.
* NOTE: For OP_CONNECT and OP_WRITE -type selectors, once a selection request op
* completes, the channel's op registration is automatically disabled (paused); any
* future wanted selection notification requires re-enabling via resume. For OP_READ selectors,
* it stays enabled until actively paused, no matter how many times it is selected.
* @param channel socket to listen for
* @param listener op-complete listener
* @param attachment object to be passed back with listener notification
*/
protected void registerSupport( AbstractSelectableChannel channel, VirtualAbstractSelectorListener listener, Object attachment ) {
if( SAFE_SELECTOR_MODE_ENABLED ) {
try{ selectors_mon.enter();
//System.out.println( "register - " + channel.hashCode() + " - " + Debug.getCompressedStackTrace());
for( Iterator it = selectors.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
VirtualChannelSelectorImpl sel = (VirtualChannelSelectorImpl)entry.getKey();
ArrayList channels = (ArrayList)entry.getValue();
if( channels.size() >= ( TEST_SAFE_MODE?0:MAX_CHANNELS_PER_SAFE_SELECTOR )) {
// it seems that we have a bug somewhere where a selector is being registered
// but not cancelled on close. As an interim fix scan channels and remove any
// closed ones
Iterator chan_it = channels.iterator();
while( chan_it.hasNext()){
AbstractSelectableChannel chan = (AbstractSelectableChannel)chan_it.next();
if ( !chan.isOpen()){
Debug.out( "Selector '" + getName() + "' - removing orphaned safe channel registration" );
chan_it.remove();
}
}
}
if( channels.size() < MAX_CHANNELS_PER_SAFE_SELECTOR ) { //there's room in the current selector
sel.register( channel, listener, attachment );
channels.add( channel );
return;
}
}
//we couldnt find room in any of the existing selectors, so start up a new one if allowed
//max limit to the number of Selectors we are allowed to create
if( selectors.size() >= MAX_SAFEMODE_SELECTORS ) {
String msg = "Error: MAX_SAFEMODE_SELECTORS reached [" +selectors.size()+ "], no more socket channels can be registered. Too many peer connections.";
Debug.out( msg );
selectFailure( listener, channel, attachment, new Throwable( msg ) ); //reject registration
return;
}
if ( destroyed ){
String msg = "socket registered after controller destroyed";
Debug.out( msg );
selectFailure( listener, channel, attachment, new Throwable( msg ) ); //reject registration
return;
}
VirtualChannelSelectorImpl sel = new VirtualChannelSelectorImpl( this, op, pause );
ArrayList chans = new ArrayList();
selectors.put( sel, chans );
sel.register( channel, listener, attachment );
chans.add( channel );
selectors_keyset_cow = new HashSet( selectors.keySet());
}
finally{ selectors_mon.exit(); }
}
else {
selector_impl.register( channel, listener, attachment );
}
}
/**
* Pause selection operations for the given channel
* @param channel to pause
*/
public void pauseSelects( AbstractSelectableChannel channel ) {
if( SAFE_SELECTOR_MODE_ENABLED ) {
try{ selectors_mon.enter();
//System.out.println( "pause - " + channel.hashCode() + " - " + Debug.getCompressedStackTrace());
for( Iterator it = selectors.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
VirtualChannelSelectorImpl sel = (VirtualChannelSelectorImpl)entry.getKey();
ArrayList channels = (ArrayList)entry.getValue();
if( channels.contains( channel ) ) {
sel.pauseSelects( channel );
return;
}
}
System.out.println( "pauseSelects():: channel not found!" );
}
finally{ selectors_mon.exit(); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -