⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 virtualchannelselectorimpl.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                key.cancel();  //cancel the key, since already registered
              }
	            
            }catch( Throwable e ){
         		
              Debug.printStackTrace(e);
            }
          }else{
            //process new registrations  
 
            RegistrationData data = (RegistrationData)obj;
            	
            if( data == null ) {
              Debug.out( "data == null" );
            }
            
            if( data.channel == null ) {
              Debug.out( "data.channel == null" );
            }
            
            try {
              if( data.channel.isOpen() ){
                	
                // see if already registered
                SelectionKey key = data.channel.keyFor( selector );
                  
                if ( key != null && key.isValid() ) {  //already registered
                  key.attach( data );
                  key.interestOps( key.interestOps() | INTEREST_OP );  //ensure op is enabled
                }
                else{
                  data.channel.register( selector, INTEREST_OP, data );
                }
                  
                //check if op has been paused before registration moment
                Object paused = paused_states.get( data.channel );
                  
                if( paused != null ) {
                  pauseSelects( data.channel );  //pause it
                }
              }
              else{
            	
              	select_fail_data	= data;
              	select_fail_excep	= new Throwable( "select registration: channel is closed" );
              	
              }
            }catch (Throwable t){
              
            	Debug.printStackTrace(t);
           	    
           		select_fail_data	= data;
           		select_fail_excep	= t;
            } 	
          }
        }
        
        paused_states.clear();  //reset after every registration round
               
      }finally { 
      	
      	register_cancel_list_mon.exit();
      }
      
      if ( select_fail_data != null ){
      	
      	try{
	      	parent.selectFailure( 
	      			select_fail_data.listener,
					select_fail_data.channel, 
					select_fail_data.attachment, 
					select_fail_excep );
	      	
      	}catch( Throwable e ){
      		
      		Debug.printStackTrace( e );
      	}
      }
      
  
      //do the actual select
      int count = 0;
      selector_guard.markPreSelectTime();
      try {
        count = selector.select( timeout );
      }
      catch (Throwable t) {
        Debug.out( "Caught exception on selector.select() op: " +t.getMessage(), t );
        try {  Thread.sleep( timeout );  }catch(Throwable e) { e.printStackTrace(); }
      }
      
      	// do this after the select so that any pending cancels (prior to destroy) are processed
      	// by the selector before we kill it
      
 	  if ( destroyed ){
  		
 	    closeExistingSelector();
 	    	
 	    return( 0 );
 	  }
 	   	  
      /*
      if( INTEREST_OP == VirtualChannelSelector.OP_READ ) {  //TODO
      	select_counts[ round ] = count;
      	round++;
      	if( round == select_counts.length ) {
      		StringBuffer buf = new StringBuffer( select_counts.length * 3 );
      		
      		buf.append( "select_counts=" );
      		for( int i=0; i < select_counts.length; i++ ) {
      			buf.append( select_counts[i] );
      			buf.append( ' ' );
      		}
      		
      		//System.out.println( buf.toString() );
      		round = 0;
      	}
      }
      */
      
      selector_guard.verifySelectorIntegrity( count, SystemTime.TIME_GRANULARITY_MILLIS /2 );
      
      if( !selector.isOpen() )  return count;
      
      int	progress_made_key_count	= 0;
      int	total_key_count			= 0;
      
      long	now = SystemTime.getCurrentTime();
      
      	//notification of ready keys via listener callback
      
      	// debug handling for channels stuck pending write select for long periods
      
      Set	non_selected_keys = null;
      
      if ( INTEREST_OP == VirtualChannelSelector.OP_WRITE ){
    	  
    	  if ( 	now < last_write_select_debug ||
    			now - last_write_select_debug > WRITE_SELECTOR_DEBUG_CHECK_PERIOD ){
    		  
    		  last_write_select_debug = now;
    		  
    		  non_selected_keys = new HashSet( selector.keys());
    	  }
      }
      
      for( Iterator i = selector.selectedKeys().iterator(); i.hasNext(); ) {
    	  
    	total_key_count++;
    	
        SelectionKey key = (SelectionKey)i.next();
        i.remove();
        RegistrationData data = (RegistrationData)key.attachment();

        if ( non_selected_keys != null ){
        	
        	non_selected_keys.remove( key );
        }
        
        data.last_select_success_time = now;
        // int	rm_type;
        
        if( key.isValid() ) {
          if( (key.interestOps() & INTEREST_OP) == 0 ) {  //it must have been paused between select and notification
        	// rm_type = 2;
          }else{            
            
	          if( pause_after_select ) { 
	            key.interestOps( key.interestOps() & ~INTEREST_OP );
	          }
	                        
	          boolean	progress_indicator = parent.selectSuccess( data.listener, data.channel, data.attachment );
	          
	          if ( progress_indicator ){
	            
	        	// rm_type = 0;
	        	
	        	progress_made_key_count++;
	        	  
	            data.non_progress_count = 0;
	            
	          }else{
	            
	        	// rm_type = 1;
	        	  
	            data.non_progress_count++;
	            	
	            if ( 	data.non_progress_count == 10 ||
	            		data.non_progress_count %100 == 0 && data.non_progress_count > 0 ){
	            		
	              Debug.out( 
	                  "VirtualChannelSelector: No progress for op " + INTEREST_OP + 
	                  	": listener = " + data.listener.getClass() + 
	                  	", count = " + data.non_progress_count +
	                  	", socket: open = " + data.channel.isOpen() + 
	                  		(INTEREST_OP==VirtualChannelSelector.OP_ACCEPT?"":
	                  			(", connected = " + ((SocketChannel)data.channel).isConnected())));
	                			  
	            		
	              if ( data.non_progress_count == 1000 ){
	                
	                Debug.out( "No progress for " + data.non_progress_count + ", closing connection" );
	            			
	                try{
	                  data.channel.close();
	            				
	                }catch( Throwable e ){
	            				e.printStackTrace();
	                }
	              }
	            }
	          }
	        }
        }else{
          // rm_type = 3;
          key.cancel();
          parent.selectFailure( data.listener, data.channel, data.attachment, new Throwable( "key is invalid" ) );
          // can get this if socket has been closed between select and here
        }
        
        /*
        if ( rm_trace ){
        	
          	Object	rm_key = data.listener.getClass();
          	
          	int[]	rm_count = (int[])rm_listener_map.get( rm_key );
          	
          	if ( rm_count == null ){
          		
          		rm_count = new int[]{0,0,0,0};
          		
          		rm_listener_map.put( rm_key, rm_count );
          	}
          	
          	rm_count[rm_type]++;
          }
          */
      }
      
      if ( non_selected_keys != null ){
    	  
    	  for( Iterator i = non_selected_keys.iterator(); i.hasNext(); ) {
    	    	     	    	
    		  SelectionKey key = (SelectionKey)i.next();
    	    
    	      RegistrationData data = (RegistrationData)key.attachment();
 
        	  if (( key.interestOps() & INTEREST_OP) == 0 ) { 

        		  continue;
        	  }
        	  
    	      long	stall_time = now - data.last_select_success_time;
    	      
    	      if ( stall_time < 0 ){
    	    	  
    	    	  data.last_select_success_time	= now;
    	    	  
    	      }else{
    	    	  
	    	      if ( stall_time > WRITE_SELECTOR_DEBUG_MAX_TIME ){
	    	    	
	    	    	  Logger.log(
	    	    		new LogEvent(LOGID,LogEvent.LT_WARNING,"Write select for " + key.channel() + " stalled for " + stall_time ));	    	  
	    	    	  
	    	    	  	// hack - trigger a dummy write select to see if things are still OK
	    	    	  
	    	          if( key.isValid() ) {
    	        	  
    	        		  if( pause_after_select ) { 

    	        			  key.interestOps( key.interestOps() & ~INTEREST_OP );
    	        		  }
    	        		  
    	        		  if ( parent.selectSuccess( data.listener, data.channel, data.attachment )){

    	        			  data.non_progress_count = 0;
    	        		  }
	    	          }else{

	    	        	  key.cancel();

	    	        	  parent.selectFailure( data.listener, data.channel, data.attachment, new Throwable( "key is invalid" ) );
	    	          }
	    	      }
	    	  }
    	  }
      }
    	  
        
      	// if any of the ready keys hasn't made any progress then enforce minimum sleep period to avoid
      	// spinning
      
      if ( total_key_count == 0 || progress_made_key_count != total_key_count ){
    	  
	      long time_diff = SystemTime.getCurrentTime() - select_start_time;
	      
	      if( time_diff < timeout && time_diff >= 0 ) {  //ensure that it always takes at least 'timeout' time to complete the select op
	      	try {  Thread.sleep( timeout - time_diff );  }catch(Throwable e) { e.printStackTrace(); }      
	      }
      }else{
    	  /*
    	  if ( rm_test_fix ){
    		 
    	      long time_diff = SystemTime.getCurrentTime() - select_start_time;
    	      
    	      if( time_diff < 10 && time_diff >= 0 ) { 
    	      	try {  Thread.sleep( 10 - time_diff );  }catch(Throwable e) { e.printStackTrace(); }      
    	      } 
    	  }
    	  */
      }
      
      /*
      if ( rm_trace ){
    	  
    	  if ( select_start_time - rm_flag_last_log > 10000 ){
    		  
    		  rm_flag_last_log	= select_start_time;
    		  
    		  Iterator it = rm_listener_map.entrySet().iterator();
    		
    		  String	str = "";
    		  
    		  while( it.hasNext()){
    			  
    			  Map.Entry	entry = (Map.Entry)it.next();
    			  
    			  Class	cla = (Class)entry.getKey();
    			  
    			  String	name = cla.getName();
    			  int		pos = name.lastIndexOf('.');
    			  name = name.substring( pos+1 );
    			  
    			  int[]	counts = (int[])entry.getValue();
    			  
    			  str += (str.length()==0?"":",")+ name + ":" + counts[0]+"/"+counts[1]+"/"+counts[2]+"/"+counts[3];
    		  }
    		  
       		  Debug.outNoStack( "RM trace: " + hashCode() + ": op=" + INTEREST_OP + "-" + str ); 
    	  }
      }
      */
      
      return count;
    }
    
    	/**
    	 * Note that you have to ensure that a select operation is performed on the normal select
    	 * loop *after* destroying the selector to actually cause the destroy to occur
    	 */
    
    public void
    destroy()
    {
    	destroyed	= true;
    }
    
    protected void closeExistingSelector() {
      for( Iterator i = selector.keys().iterator(); i.hasNext(); ) {
        SelectionKey key = (SelectionKey)i.next();
        RegistrationData data = (RegistrationData)key.attachment();
        parent.selectFailure(data.listener, data.channel, data.attachment, new Throwable( "selector destroyed" ) );
      }
      
      try{
        selector.close();
        
        AEDiagnostics.log( "seltrace", "Selector destroyed for '" + parent.getName() + "'," + selector_guard.getType());
      }
      catch( Throwable t ) { t.printStackTrace(); }
    }
    
    
    
    
    private static class RegistrationData {
        protected final AbstractSelectableChannel channel;
        protected final VirtualChannelSelector.VirtualAbstractSelectorListener listener;
        protected final Object attachment;
        
        protected int 	non_progress_count;
        protected long	last_select_success_time;
        
      	private RegistrationData( AbstractSelectableChannel _channel, VirtualChannelSelector.VirtualAbstractSelectorListener _listener, Object _attachment ) {
      		channel 		= _channel;
      		listener		= _listener;
      		attachment 		= _attachment;
      		
      		last_select_success_time	= SystemTime.getCurrentTime();
      	}
      }
          
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -