📄 virtualchannelselectorimpl.java
字号:
}
}
public int select( long timeout ) {
long select_start_time = SystemTime.getCurrentTime();
if( selector == null ) {
Debug.out( "VirtualChannelSelector.select() op called with null selector" );
try { Thread.sleep( 3000 ); }catch( Throwable x ) {x.printStackTrace();}
return 0;
}
if( !selector.isOpen()) {
Debug.out( "VirtualChannelSelector.select() op called with closed selector" );
try { Thread.sleep( 3000 ); }catch( Throwable x ) {x.printStackTrace();}
return 0;
}
// store these when they occur so they can be raised *outside* of the monitor to avoid
// potential deadlocks
RegistrationData select_fail_data = null;
Throwable select_fail_excep = null;
//process cancellations
try {
register_cancel_list_mon.enter();
// don't use an iterator here as it is possible that error notifications to listeners
// can result in the addition of a cancel request.
// Note that this can only happen for registrations, and this *should* only result in
// possibly a cancel being added (i.e. not a further registration), hence this can't
// loop. Also note the approach of removing the entry before processing. This is so
// that the logic used when adding a cancel (the removal of any matching entries) does
// not cause the entry we're processing to be removed
while( register_cancel_list.size() > 0 ){
Object obj = register_cancel_list.remove(0);
if ( obj instanceof SocketChannel ){
// process cancellation
SocketChannel canceled_channel = (SocketChannel)obj;
try{
SelectionKey key = canceled_channel.keyFor( selector );
if( key != null ){
key.cancel(); //cancel the key, since already registered
}
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}else{
//process new registrations
RegistrationData data = (RegistrationData)obj;
if( data == null ) {
Debug.out( "data == null" );
}
if( data.channel == null ) {
Debug.out( "data.channel == null" );
}
try {
if( data.channel.isOpen() ){
// see if already registered
SelectionKey key = data.channel.keyFor( selector );
if ( key != null && key.isValid() ) { //already registered
key.attach( data );
key.interestOps( key.interestOps() | INTEREST_OP ); //ensure op is enabled
}
else{
data.channel.register( selector, INTEREST_OP, data );
}
//check if op has been paused before registration moment
Object paused = paused_states.get( data.channel );
if( paused != null ) {
pauseSelects( data.channel ); //pause it
}
}
else{
select_fail_data = data;
select_fail_excep = new Throwable( "select registration: channel is closed" );
}
}catch (Throwable t){
Debug.printStackTrace(t);
select_fail_data = data;
select_fail_excep = t;
}
}
}
paused_states.clear(); //reset after every registration round
}finally {
register_cancel_list_mon.exit();
}
if ( select_fail_data != null ){
try{
select_fail_data.listener.selectFailure(
parent,
select_fail_data.channel,
select_fail_data.attachment,
select_fail_excep );
}catch( Throwable e ){
Debug.printStackTrace( e );
}
}
//do the actual select
int count = 0;
selector_guard.markPreSelectTime();
try {
count = selector.select( timeout );
}
catch (Throwable t) {
Debug.out( "Caught exception on selector.select() op: " +t.getMessage(), t );
try { Thread.sleep( timeout ); }catch(Throwable e) { e.printStackTrace(); }
}
// do this after the select so that any pending cancels (prior to destroy) are processed
// by the selector before we kill it
if ( destroyed ){
closeExistingSelector();
return( 0 );
}
/*
if( INTEREST_OP == VirtualChannelSelector.OP_READ ) { //TODO
select_counts[ round ] = count;
round++;
if( round == select_counts.length ) {
StringBuffer buf = new StringBuffer( select_counts.length * 3 );
buf.append( "select_counts=" );
for( int i=0; i < select_counts.length; i++ ) {
buf.append( select_counts[i] );
buf.append( ' ' );
}
//System.out.println( buf.toString() );
round = 0;
}
}
*/
selector_guard.verifySelectorIntegrity( count, SystemTime.TIME_GRANULARITY_MILLIS /2 );
if( !selector.isOpen() ) return count;
//notification of ready keys via listener callback
for( Iterator i = selector.selectedKeys().iterator(); i.hasNext(); ) {
SelectionKey key = (SelectionKey)i.next();
i.remove();
RegistrationData data = (RegistrationData)key.attachment();
if( key.isValid() ) {
if( (key.interestOps() & INTEREST_OP) == 0 ) { //it must have been paused between select and notification
continue;
}
if( pause_after_select ) {
key.interestOps( key.interestOps() & ~INTEREST_OP );
}
boolean progress_made = data.listener.selectSuccess( parent, data.channel, data.attachment );
if ( progress_made ){
data.non_progress_count = 0;
}else{
data.non_progress_count++;
if ( data.non_progress_count %100 == 0 && data.non_progress_count > 0 ){
System.out.println(
"VirtualChannelSelector: No progress for op " + INTEREST_OP + ": " + data.non_progress_count +
", socket: open = " + data.channel.isOpen() + ", connected = " + data.channel.isConnected());
if ( data.non_progress_count == 1000 ){
Debug.out( "No progress for " + data.non_progress_count + ", closing connection" );
try{
data.channel.close();
}catch( Throwable e ){
e.printStackTrace();
}
}
}
}
}
else {
key.cancel();
data.listener.selectFailure( parent, data.channel, data.attachment, new Throwable( "key is invalid" ) );
// can get this if socket has been closed between select and here
}
}
long time_diff = SystemTime.getCurrentTime() - select_start_time;
if( time_diff < timeout && time_diff >= 0 ) { //ensure that it always takes at least 'timeout' time to complete the select op
try { Thread.sleep( timeout - time_diff ); }catch(Throwable e) { e.printStackTrace(); }
}
return count;
}
/**
* Note that you have to ensure that a select operation is performed on the normal select
* loop *after* destroying the selector to actually cause the destroy to occur
*/
public void
destroy()
{
destroyed = true;
}
protected void closeExistingSelector() {
for( Iterator i = selector.keys().iterator(); i.hasNext(); ) {
SelectionKey key = (SelectionKey)i.next();
RegistrationData data = (RegistrationData)key.attachment();
data.listener.selectFailure( parent, data.channel, data.attachment, new Throwable( "selector destroyed" ) );
}
try{
selector.close();
}
catch( Throwable t ) { t.printStackTrace(); }
}
private static class RegistrationData {
protected final SocketChannel channel;
protected final VirtualChannelSelector.VirtualSelectorListener listener;
protected final Object attachment;
protected int non_progress_count;
private RegistrationData( SocketChannel _channel, VirtualChannelSelector.VirtualSelectorListener _listener, Object _attachment ) {
channel = _channel;
listener = _listener;
attachment = _attachment;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -