⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 virtualchannelselectorimpl.java

📁 基于JXTA开发平台的下载软件开发源代码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    	}
    }
    
    
    
    public int select( long timeout ) {
    	
      long select_start_time = SystemTime.getCurrentTime();
      
      if( selector == null ) {
        Debug.out( "VirtualChannelSelector.select() op called with null selector" );
        try {  Thread.sleep( 3000 );  }catch( Throwable x ) {x.printStackTrace();}
        return 0;
      } 
      
      if( !selector.isOpen()) {
          Debug.out( "VirtualChannelSelector.select() op called with closed selector" );
          try {  Thread.sleep( 3000 );  }catch( Throwable x ) {x.printStackTrace();}
          return 0;
      }  
      
      	// store these when they occur so they can be raised *outside* of the monitor to avoid
      	// potential deadlocks
      
      RegistrationData	select_fail_data	= null;
      Throwable 		select_fail_excep	= null;
      
      //process cancellations
      try {
      	register_cancel_list_mon.enter();
        
      		// don't use an iterator here as it is possible that error notifications to listeners
      		// can result in the addition of a cancel request.
      		// Note that this can only happen for registrations, and this *should* only result in
      		// possibly a cancel being added (i.e. not a further registration), hence this can't
      		// loop. Also note the approach of removing the entry before processing. This is so
      		// that the logic used when adding a cancel (the removal of any matching entries) does
      		// not cause the entry we're processing to be removed
      	
        while( register_cancel_list.size() > 0 ){
        	
          Object	obj = register_cancel_list.remove(0);
         
          if ( obj instanceof SocketChannel ){
           
         		// process cancellation
         	
            SocketChannel	canceled_channel = (SocketChannel)obj;
  
            try{
              SelectionKey key = canceled_channel.keyFor( selector );
	            
              if( key != null ){
	            	
                key.cancel();  //cancel the key, since already registered
              }
	            
            }catch( Throwable e ){
         		
              Debug.printStackTrace(e);
            }
          }else{
            //process new registrations  
 
            RegistrationData data = (RegistrationData)obj;
            	
            if( data == null ) {
              Debug.out( "data == null" );
            }
            
            if( data.channel == null ) {
              Debug.out( "data.channel == null" );
            }
            
            try {
              if( data.channel.isOpen() ){
                	
                // see if already registered
                SelectionKey key = data.channel.keyFor( selector );
                  
                if ( key != null && key.isValid() ) {  //already registered
                  key.attach( data );
                  key.interestOps( key.interestOps() | INTEREST_OP );  //ensure op is enabled
                }
                else{
                  data.channel.register( selector, INTEREST_OP, data );
                }
                  
                //check if op has been paused before registration moment
                Object paused = paused_states.get( data.channel );
                  
                if( paused != null ) {
                  pauseSelects( data.channel );  //pause it
                }
              }
              else{
            	
              	select_fail_data	= data;
              	select_fail_excep	= new Throwable( "select registration: channel is closed" );
              	
              }
            }catch (Throwable t){
              
            	Debug.printStackTrace(t);
           	    
           		select_fail_data	= data;
           		select_fail_excep	= t;
            } 	
          }
        }
        
        paused_states.clear();  //reset after every registration round
               
      }finally { 
      	
      	register_cancel_list_mon.exit();
      }
      
      if ( select_fail_data != null ){
      	
      	try{
	      	select_fail_data.listener.selectFailure(
	      	        parent, 
					select_fail_data.channel, 
					select_fail_data.attachment, 
					select_fail_excep );
	      	
      	}catch( Throwable e ){
      		
      		Debug.printStackTrace( e );
      	}
      }
      
  
      //do the actual select
      int count = 0;
      selector_guard.markPreSelectTime();
      try {
        count = selector.select( timeout );
      }
      catch (Throwable t) {
        Debug.out( "Caught exception on selector.select() op: " +t.getMessage(), t );
        try {  Thread.sleep( timeout );  }catch(Throwable e) { e.printStackTrace(); }
      }
      
      	// do this after the select so that any pending cancels (prior to destroy) are processed
      	// by the selector before we kill it
      
 	  if ( destroyed ){
  		
 	    closeExistingSelector();
 	    	
 	    return( 0 );
 	  }
 	   	  
      /*
      if( INTEREST_OP == VirtualChannelSelector.OP_READ ) {  //TODO
      	select_counts[ round ] = count;
      	round++;
      	if( round == select_counts.length ) {
      		StringBuffer buf = new StringBuffer( select_counts.length * 3 );
      		
      		buf.append( "select_counts=" );
      		for( int i=0; i < select_counts.length; i++ ) {
      			buf.append( select_counts[i] );
      			buf.append( ' ' );
      		}
      		
      		//System.out.println( buf.toString() );
      		round = 0;
      	}
      }
      */
      
      selector_guard.verifySelectorIntegrity( count, SystemTime.TIME_GRANULARITY_MILLIS /2 );
      
      if( !selector.isOpen() )  return count;
      
      //notification of ready keys via listener callback
      for( Iterator i = selector.selectedKeys().iterator(); i.hasNext(); ) {
        SelectionKey key = (SelectionKey)i.next();
        i.remove();
        RegistrationData data = (RegistrationData)key.attachment();

        if( key.isValid() ) {
          if( (key.interestOps() & INTEREST_OP) == 0 ) {  //it must have been paused between select and notification
            continue;
          }            
            
          if( pause_after_select ) { 
            key.interestOps( key.interestOps() & ~INTEREST_OP );
          }
                        
          boolean	progress_made = data.listener.selectSuccess( parent, data.channel, data.attachment );
            
          if ( progress_made ){
            
            data.non_progress_count = 0;
          }else{
            	
            data.non_progress_count++;
            	
            if ( data.non_progress_count %100 == 0 && data.non_progress_count > 0 ){
            		
              System.out.println( 
                  "VirtualChannelSelector: No progress for op " + INTEREST_OP + ": " + data.non_progress_count +
                  ", socket: open = " + data.channel.isOpen() + ", connected = " + data.channel.isConnected());
            		
              if ( data.non_progress_count == 1000 ){
                
                Debug.out( "No progress for " + data.non_progress_count + ", closing connection" );
            			
                try{
                  data.channel.close();
            				
                }catch( Throwable e ){
            				e.printStackTrace();
                }
              }
            }
          }
        }
        else {
          key.cancel();
          data.listener.selectFailure( parent, data.channel, data.attachment, new Throwable( "key is invalid" ) );
          // can get this if socket has been closed between select and here
        }
      }
      
      
      long time_diff = SystemTime.getCurrentTime() - select_start_time;
      
      if( time_diff < timeout && time_diff >= 0 ) {  //ensure that it always takes at least 'timeout' time to complete the select op
      	try {  Thread.sleep( timeout - time_diff );  }catch(Throwable e) { e.printStackTrace(); }      
      }
      
      return count;
    }
    
    	/**
    	 * Note that you have to ensure that a select operation is performed on the normal select
    	 * loop *after* destroying the selector to actually cause the destroy to occur
    	 */
    
    public void
    destroy()
    {
    	destroyed	= true;
    }
    
    protected void closeExistingSelector() {
      for( Iterator i = selector.keys().iterator(); i.hasNext(); ) {
        SelectionKey key = (SelectionKey)i.next();
        RegistrationData data = (RegistrationData)key.attachment();
        data.listener.selectFailure( parent, data.channel, data.attachment, new Throwable( "selector destroyed" ) );
      }
      
      try{
        selector.close();
      }
      catch( Throwable t ) { t.printStackTrace(); }
    }
    
    
    
    
    private static class RegistrationData {
        protected final SocketChannel channel;
        protected final VirtualChannelSelector.VirtualSelectorListener listener;
        protected final Object attachment;
        
        protected int non_progress_count;
        
      	private RegistrationData( SocketChannel _channel, VirtualChannelSelector.VirtualSelectorListener _listener, Object _attachment ) {
      		channel 		= _channel;
      		listener		= _listener;
      		attachment 		= _attachment;
      	}
      }
          
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -