⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 aeproxyimpl.java

📁 Azureus is a powerful, full-featured, cross-platform java BitTorrent client
💻 JAVA
字号:
/*
 * Created on 06-Dec-2004
 * Created by Paul Gardner
 * Copyright (C) 2004 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SARL au capital de 30,000 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */

package com.aelitis.azureus.core.proxy.impl;

import java.util.*;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

import org.gudy.azureus2.core3.logging.LGLogger;
import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.AEThread;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.SystemTime;

import com.aelitis.azureus.core.networkmanager.VirtualChannelSelector;

import com.aelitis.azureus.core.proxy.*;

/**
 * @author parg
 *
 */

public class 
AEProxyImpl 
	implements AEProxy, VirtualChannelSelector.VirtualSelectorListener
{
	protected int				port;
	protected long				connect_timeout;
	protected long				read_timeout;
	protected AEProxyHandler	proxy_handler;
	
	protected VirtualChannelSelector	read_selector	 = new VirtualChannelSelector( VirtualChannelSelector.OP_READ );
	protected VirtualChannelSelector	connect_selector = new VirtualChannelSelector( VirtualChannelSelector.OP_CONNECT );
	protected VirtualChannelSelector	write_selector	 = new VirtualChannelSelector( VirtualChannelSelector.OP_WRITE );
	
	protected Map				processors = new WeakHashMap();
	
	protected AEMonitor			this_mon	= new AEMonitor( "AEProxyImpl" );
	
	public 
	AEProxyImpl(
		int				_port,
		long			_connect_timeout,
		long			_read_timeout,
		AEProxyHandler	_proxy_handler )
	
		throws AEProxyException
	{
		port				= _port;
		connect_timeout		= _connect_timeout;
		read_timeout		= _read_timeout;
		proxy_handler		= _proxy_handler;
		
		try{
			
			final ServerSocketChannel	ssc = ServerSocketChannel.open();
			
			ServerSocket ss	= ssc.socket();
			
			ss.setReuseAddress(true);

			ss.bind(  new InetSocketAddress( InetAddress.getByName("127.0.0.1"), port), 128 );
			
			if ( port == 0 ){
				
				port	= ss.getLocalPort();
			}
				
			Thread connect_thread = 
				new AEThread("AEProxy:connect.loop")
				{
					public void
					runSupport()
					{
						selectLoop( connect_selector );
					}
				};
	
			connect_thread.setDaemon( true );
	
			connect_thread.start();
	
			Thread read_thread = 
				new AEThread("AEProxy:read.loop")
				{
					public void
					runSupport()
					{
						selectLoop( read_selector );
					}
				};
	
			read_thread.setDaemon( true );
	
			read_thread.start();
			
			Thread write_thread = 
				new AEThread("AEProxy:write.loop")
				{
					public void
					runSupport()
					{
						selectLoop( write_selector );
					}
				};
	
			write_thread.setDaemon( true );
	
			write_thread.start();
			
			Thread accept_thread = 
					new AEThread("AEProxy:accept.loop")
					{
						public void
						runSupport()
						{
							acceptLoop( ssc );
						}
					};
		
			accept_thread.setDaemon( true );
		
			accept_thread.start();									
		
			LGLogger.log( "AEProxy: listener established on port " + port ); 
			
		}catch( Throwable e){
		
			LGLogger.logUnrepeatableAlertUsingResource( 
					LGLogger.AT_ERROR,
					"Tracker.alert.listenfail",
					new String[]{ ""+port });
	
			LGLogger.log( "AEProxy: listener failed on port " + port, e ); 
						
			throw( new AEProxyException( "AEProxy: accept fails: " + e.toString()));
		}			
	}	
	
	protected void
	acceptLoop(
		ServerSocketChannel	ssc )
	{		
		long	successfull_accepts = 0;
		long	failed_accepts		= 0;

		while(true){
			
			try{				
				SocketChannel socket_channel = ssc.accept();
						
				successfull_accepts++;
				
				if ( !socket_channel.socket().getInetAddress().isLoopbackAddress()){
					
					LGLogger.log( "AEProxy: incoming connection from '" + socket_channel.socket().getInetAddress() + "' - closed as not local" );
				
					socket_channel.close();
				}
						
				socket_channel.configureBlocking(false);

				AEProxyConnectionImpl processor = new AEProxyConnectionImpl(this, socket_channel, proxy_handler);
				
				try{
					this_mon.enter();
				
					processors.put( processor, "" );
	
					LGLogger.log( "AEProxy: num processors = " + processors.size());
					
				}finally{
					
					this_mon.exit();
				}
				
				read_selector.register( socket_channel, this, processor );
				
			}catch( Throwable e ){
				
				failed_accepts++;

				LGLogger.log( "AEProxy: listener failed on port " + port, e ); 
			
				if ( failed_accepts > 100 && successfull_accepts == 0 ){

						// looks like its not going to work...
						// some kind of socket problem
									
					LGLogger.logUnrepeatableAlertUsingResource( 
							LGLogger.AT_ERROR,
							"Network.alert.acceptfail",
							new String[]{ ""+port, "TCP" } );
			
					break;
				}			
			}
		}
	}
	
	protected void
	close(
		AEProxyConnectionImpl	processor )
	{
		try{
			this_mon.enter();
			
			processors.remove( processor );
			
		}finally{
		
			this_mon.exit();
		}
	}
	
	protected void
	selectLoop(
		VirtualChannelSelector	selector )
	{
		long	last_time	= 0;
		
		while( true ){
			
			try{
				selector.select(100);
				
					// only use one selector to trigger the timeouts!
				
				if ( selector == read_selector ){
					
					long	now = SystemTime.getCurrentTime();
					
					if ( now < last_time ){
						
						last_time	= now;
						
					}else if ( now - last_time >= 5000 ){
						
						last_time	= now;
						
						checkTimeouts();
					}
				}
			}catch( Throwable e ){
				
				Debug.printStackTrace(e);
			}
		}
	}

	protected void
	checkTimeouts()
	{
		if ( connect_timeout <= 0 && read_timeout <= 0 ){
			
			return;
		}
		
		List	closes = new ArrayList();
		
		try{
			this_mon.enter();
			
			long	now = SystemTime.getCurrentTime();
			
			Iterator	it = processors.keySet().iterator();
			
			while( it.hasNext()){
				
				AEProxyConnectionImpl	processor = (AEProxyConnectionImpl)it.next();
				
				long diff = now - processor.getTimeStamp();
				
				if ( 	connect_timeout > 0 &&
						diff >= connect_timeout && 
						!processor.isConnected()){
					
					closes.add( processor );
				
				}else if (	read_timeout > 0 &&
							diff >= read_timeout &&
							processor.isConnected()){
					
					closes.add( processor );
				}
			}
		}finally{
			
			this_mon.exit();
		}
		
		for (int i=0;i<closes.size();i++){
			
			((AEProxyConnectionImpl)closes.get(i)).failed( new Throwable( "timeout" ));
		}
	}
	
	protected void
	requestWriteSelect(
		AEProxyConnectionImpl	processor,
		SocketChannel 			sc )
	{
		write_selector.register( sc, this, processor );
	}
	
	protected void
	cancelWriteSelect(
		SocketChannel 			sc )
	{
		write_selector.cancel( sc );
	}
	
	protected void
	requestReadSelect(
		AEProxyConnectionImpl	processor,
		SocketChannel 		sc )
	{
		read_selector.register( sc, this, processor );
	}
	
	protected void
	cancelReadSelect(
		SocketChannel 		sc )
	{
		read_selector.cancel( sc );
	}
	
	protected void
	requestConnectSelect(
		AEProxyConnectionImpl	processor,
		SocketChannel 			sc )
	{
		connect_selector.register( sc, this, processor );
	}
	
	protected void
	cancelConnectSelect(
		SocketChannel 		sc )
	{
		connect_selector.cancel( sc );
	}
	
    public void 
	selectSuccess( 
		VirtualChannelSelector	selector, 
		SocketChannel 			sc,
		Object 					attachment )
    {
    	AEProxyConnectionImpl	processor = (AEProxyConnectionImpl)attachment;
    	   	
    	if ( selector == read_selector ){
    		
    		processor.read(sc);
    		
    	}else if ( selector == write_selector ){
    		
    		processor.write(sc);
    		
    	}else{
    		
    		processor.connect(sc);
    	}
    }
    
    public void 
	selectFailure( 
		VirtualChannelSelector	selector, 
		SocketChannel 			sc,
		Object 					attachment,
		Throwable 				msg )
    {
    	AEProxyConnectionImpl	processor = (AEProxyConnectionImpl)attachment;
    	
    	processor.failed( msg );
    }
    
	public int
	getPort()
	{
		return( port );
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -