⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 virtualchannelselectorimpl.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * Created on Jul 28, 2004
 * Created by Alon Rohter
 * Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SAS au capital de 46,603.30 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */
package com.aelitis.azureus.core.networkmanager.impl.tcp;


import java.nio.channels.*;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.util.*;

import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.*;

import com.aelitis.azureus.core.networkmanager.VirtualChannelSelector;



/**
 * Provides a simplified and safe (selectable-channel) socket single-op selector.
 */
public class VirtualChannelSelectorImpl {
	
	private static final LogIDs LOGID = LogIDs.NWMAN;

	/*
	static boolean	rm_trace 	= false;
	static boolean	rm_test_fix = false;
	
	static{
	
		COConfigurationManager.addAndFireParameterListeners(
				new String[]{ "user.rm.trace", "user.rm.testfix" },
				new ParameterListener()
				{
					public void 
					parameterChanged(
						String parameterName)
					{
						rm_trace 	= COConfigurationManager.getBooleanParameter( "user.rm.trace", false );
						rm_test_fix = COConfigurationManager.getBooleanParameter( "user.rm.testfix", false );
					}
				});
	}
	
	private long rm_flag_last_log;
	private Map	rm_listener_map = new HashMap();
	*/
	
    protected Selector selector;
    private final SelectorGuard selector_guard;
    
    private final LinkedList 	register_cancel_list 		= new LinkedList();
    private final AEMonitor 	register_cancel_list_mon	= new AEMonitor( "VirtualChannelSelector:RCL");

    private final HashMap paused_states = new HashMap();
    
    private final int 		INTEREST_OP;
    private final boolean	pause_after_select;

    protected final VirtualChannelSelector parent;
    
    
    //private int[] select_counts = new int[ 50 ];
    //private int round = 0;
    
    private volatile boolean	destroyed;
    
    
    private static final int WRITE_SELECTOR_DEBUG_CHECK_PERIOD	= 10000;
    private static final int WRITE_SELECTOR_DEBUG_MAX_TIME		= 20000;
    
    private long last_write_select_debug;
    
    public VirtualChannelSelectorImpl( VirtualChannelSelector _parent, int _interest_op, boolean _pause_after_select ) {	
      this.parent = _parent;
      INTEREST_OP = _interest_op;
      
      pause_after_select	= _pause_after_select;
      
      String type;
      switch( INTEREST_OP ) {
        case VirtualChannelSelector.OP_CONNECT:
          type = "OP_CONNECT";  break;
        case VirtualChannelSelector.OP_READ:
          type = "OP_READ";  break;
        default:
          type = "OP_WRITE";  break;
      }
      
      
      selector_guard = new SelectorGuard( type, new SelectorGuard.GuardListener() {
        public boolean safeModeSelectEnabled() {
          return parent.isSafeSelectionModeEnabled();
        }
        
        public void spinDetected() {
          closeExistingSelector();
          try {  Thread.sleep( 1000 );  }catch( Throwable x ) {x.printStackTrace();}
          parent.enableSafeSelectionMode();
        }
        
        public void failureDetected() {
          try {  Thread.sleep( 10000 );  }catch( Throwable x ) {x.printStackTrace();}
          closeExistingSelector();
          try {  Thread.sleep( 1000 );  }catch( Throwable x ) {x.printStackTrace();}
          selector = openNewSelector();
        }
      });
      
      selector = openNewSelector();
    }
    
  
    
    protected Selector openNewSelector() {
      Selector sel = null;
      
      try {
        sel = Selector.open();
        
        AEDiagnostics.logWithStack( "seltrace", "Selector created for '" + parent.getName() + "'," + selector_guard.getType());
      }
      catch (Throwable t) {
        Debug.out( "ERROR: caught exception on Selector.open()", t );
        
        try {  Thread.sleep( 3000 );  }catch( Throwable x ) {x.printStackTrace();}
        
        int fail_count = 1;
        
        while( fail_count < 10 ) {
          try {
            sel = Selector.open();
            
            AEDiagnostics.logWithStack( "seltrace", "Selector created for '" + parent.getName() + "'," + selector_guard.getType());
            
            break;
          }
          catch( Throwable f ) {
            Debug.out( f );
            fail_count++;
            try {  Thread.sleep( 3000 );  }catch( Throwable x ) {x.printStackTrace();}
          }
        }
        
        if( fail_count < 10 ) { //success ! 
          Debug.out( "NOTICE: socket Selector successfully opened after " +fail_count+ " failures." );
        }
        else {  //failure
        	Logger.log(new LogAlert(LogAlert.REPEATABLE, LogAlert.AT_ERROR,
						"ERROR: socket Selector.open() failed 10 times in a row, aborting."
								+ "\nAzureus / Java is likely being firewalled!"));
        }
      }
      
      return sel;
    }
    
    
    
    
   
     
    public void pauseSelects( AbstractSelectableChannel channel ) {
      
      //System.out.println( "pauseSelects: " + channel + " - " + Debug.getCompressedStackTrace() );
      
      if( channel == null ) {
        return;
      }
      
      SelectionKey key = channel.keyFor( selector );
      
      if( key != null && key.isValid() ) {
        key.interestOps( key.interestOps() & ~INTEREST_OP );
      }
      else {  //channel not (yet?) registered
        if( channel.isOpen() ) {  //only bother if channel has not already been closed
          try{  register_cancel_list_mon.enter();
          
            paused_states.put( channel, new Boolean( true ) );  //ensure the op is paused upon reg select-time reg

          }
          finally{  register_cancel_list_mon.exit();  }
        }
      }
    }
    


    
    public void resumeSelects( AbstractSelectableChannel channel ) {
      //System.out.println( "resumeSelects: " + channel + " - " + Debug.getCompressedStackTrace() );
      if( channel == null ) {
        Debug.printStackTrace( new Exception( "resumeSelects():: channel == null" ) );
        return;
      }
      
      SelectionKey key = channel.keyFor( selector );
      
      if( key != null && key.isValid() ) {
    	  	// if we're resuming a non-interested key then reset the metrics
    	  
    	if (( key.interestOps() & INTEREST_OP ) == 0 ){
     	   RegistrationData data = (RegistrationData)key.attachment();

     	   data.last_select_success_time 	= SystemTime.getCurrentTime();
     	   data.non_progress_count			= 0;
    	}
        key.interestOps( key.interestOps() | INTEREST_OP );
      }
      else {  //channel not (yet?) registered
        try{  register_cancel_list_mon.enter();
          paused_states.remove( channel );  //check if the channel's op has been already paused before select-time reg
        }
        finally{  register_cancel_list_mon.exit();  }
      }
      
      //try{
      //  selector.wakeup();
      //}
      //catch( Throwable t ) {  Debug.out( "selector.wakeup():: caught exception: ", t );   }
    }



    
    public void 
	cancel( 
		AbstractSelectableChannel channel ) 
    {
      //System.out.println( "cancel: " + channel + " - " + Debug.getCompressedStackTrace() );
    				 
    	if ( destroyed ){
    		    		
    		// don't worry too much about cancels
    	}
    	
    	if ( channel == null ){
    		
    		Debug.out( "Attempt to cancel selects for null channel" );
    		
    		return;
    	}
    	
    	try{
    		register_cancel_list_mon.enter();
      	   		
    			// ensure that there's only one operation outstanding for a given channel
    			// at any one time (the latest operation requested )
    		
    		for (Iterator it = register_cancel_list.iterator();it.hasNext();){
    			
    			Object	obj = it.next();
    			  		   				
    			if ( 	channel == obj ||
    					(	obj instanceof RegistrationData &&
    								((RegistrationData)obj).channel == channel )){
    					
    						// remove existing cancel or register
    					   				
    				it.remove();
    				
    				break;
    			}
    		}
    		   	
			pauseSelects((AbstractSelectableChannel)channel );
			
  			register_cancel_list.add( channel );
    		
    	}finally{
    		
    		register_cancel_list_mon.exit();
    	}
    }
    
    
    
    public void 
	register( 
		AbstractSelectableChannel 								channel, 
		VirtualChannelSelector.VirtualAbstractSelectorListener 	listener, 
		Object 													attachment ) 
    {
    	if ( destroyed ){
     			
   			Debug.out( "register called after selector destroyed" );
    	}
    	
    	if ( channel == null ){
    		
    		Debug.out( "Attempt to register selects for null channel" );
    		
    		return;
    	}
    	
    	try{
    		register_cancel_list_mon.enter();
      	   		
    			// ensure that there's only one operation outstanding for a given channel
    			// at any one time (the latest operation requested )
    		
    		for (Iterator it = register_cancel_list.iterator();it.hasNext();){
    			
    			Object	obj = it.next();
    			
				if ( channel == obj ||
   						(	obj instanceof RegistrationData &&
								((RegistrationData)obj).channel == channel )){
				
					it.remove();
				
					break;
    			}
    		}
				
			paused_states.remove( channel );
			
  			register_cancel_list.add( new RegistrationData( channel, listener, attachment ));
    		
    	}finally{
    		
    		register_cancel_list_mon.exit();
    	}
    }
    
    
    
    public int select( long timeout ) {
    	
      long select_start_time = SystemTime.getCurrentTime();
      
      if( selector == null ) {
        Debug.out( "VirtualChannelSelector.select() op called with null selector" );
        try {  Thread.sleep( 3000 );  }catch( Throwable x ) {x.printStackTrace();}
        return 0;
      } 
      
      if( !selector.isOpen()) {
          Debug.out( "VirtualChannelSelector.select() op called with closed selector" );
          try {  Thread.sleep( 3000 );  }catch( Throwable x ) {x.printStackTrace();}
          return 0;
      }  
      
      	// store these when they occur so they can be raised *outside* of the monitor to avoid
      	// potential deadlocks
      
      RegistrationData	select_fail_data	= null;
      Throwable 		select_fail_excep	= null;
      
      //process cancellations
      try {
      	register_cancel_list_mon.enter();
        
      		// don't use an iterator here as it is possible that error notifications to listeners
      		// can result in the addition of a cancel request.
      		// Note that this can only happen for registrations, and this *should* only result in
      		// possibly a cancel being added (i.e. not a further registration), hence this can't
      		// loop. Also note the approach of removing the entry before processing. This is so
      		// that the logic used when adding a cancel (the removal of any matching entries) does
      		// not cause the entry we're processing to be removed
      	
        while( register_cancel_list.size() > 0 ){
        	
          Object	obj = register_cancel_list.remove(0);
         
          if ( obj instanceof AbstractSelectableChannel ){
           
         		// process cancellation
         	
        	  AbstractSelectableChannel	canceled_channel = (AbstractSelectableChannel)obj;
  
            try{
              SelectionKey key = canceled_channel.keyFor( selector );
	            
              if( key != null ){
	            	

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -