⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 genericmessageconnectionindirect.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * Created on 10 Aug 2006
 * Created by Paul Gardner
 * Copyright (C) 2006 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SAS au capital de 46,603.30 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */

package org.gudy.azureus2.pluginsimpl.local.messaging;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.*;

import org.gudy.azureus2.core3.logging.LogEvent;
import org.gudy.azureus2.core3.logging.LogIDs;
import org.gudy.azureus2.core3.logging.Logger;
import org.gudy.azureus2.core3.util.AERunnable;
import org.gudy.azureus2.core3.util.AESemaphore;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.DelayedEvent;
import org.gudy.azureus2.core3.util.DirectByteBuffer;
import org.gudy.azureus2.core3.util.SimpleTimer;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.core3.util.ThreadPool;
import org.gudy.azureus2.core3.util.TimerEvent;
import org.gudy.azureus2.core3.util.TimerEventPerformer;
import org.gudy.azureus2.plugins.messaging.MessageException;
import org.gudy.azureus2.plugins.messaging.generic.GenericMessageEndpoint;
import org.gudy.azureus2.plugins.messaging.generic.GenericMessageHandler;
import org.gudy.azureus2.plugins.utils.PooledByteBuffer;

import com.aelitis.azureus.core.nat.NATTraverser;


public class 
GenericMessageConnectionIndirect 
	implements GenericMessageConnectionAdapter
{
	private static final LogIDs LOGID = LogIDs.NET;

	private static final boolean	TRACE	= false;
	
	static{

		if ( TRACE ){
			System.out.println( "**** GenericMessageConnectionIndirect: TRACE on **** " );
		}
	}
	
	public static final int MAX_MESSAGE_SIZE	= 32*1024; 
		
	private static final int MESSAGE_TYPE_CONNECT		= 1;
	private static final int MESSAGE_TYPE_ERROR			= 2;
	private static final int MESSAGE_TYPE_DATA			= 3;
	private static final int MESSAGE_TYPE_DISCONNECT	= 4;
	
	private static final int TICK_PERIOD 				= 5000;

	private static final int KEEP_ALIVE_CHECK_PERIOD	= 5000;
	private static final int KEEP_ALIVE_MIN				= 10000;
	private static final int STATS_PERIOD				= 60000; 

	private static final int KEEP_ALIVE_CHECK_TICKS	= KEEP_ALIVE_CHECK_PERIOD / TICK_PERIOD;
	private static final int STATS_TICKS			= STATS_PERIOD / TICK_PERIOD;
	
	private static final int MAX_REMOTE_CONNECTIONS			= 1024;
	private static final int MAX_REMOTE_CONNECTIONS_PER_IP	= 32;
	
	private static long	connection_id_next	= new Random().nextLong();
	
	
	private static Map	local_connections 	= new HashMap();
	private static Map	remote_connections 	= new HashMap();
	
	private static ThreadPool	keep_alive_pool = new ThreadPool( "GenericMessageConnectionIndirect:keepAlive", 8, true );
	
	static{
		
			// there are two reasons for timers
			//     1) to check for dead connections (send keepalive/check timeouts)
			//     2) the connection is one-sided so if the responder sends an unsolicited message it 
			//        is queued and only picked up on a periodic ping by the initiator
				
		SimpleTimer.addPeriodicEvent(
			"DDBTorrent:timeout2",
			TICK_PERIOD,
			new TimerEventPerformer()
			{		
				private int	tick_count = 0;
				
				public void
				perform(
					TimerEvent	event )
				{
					tick_count++;
					
					if ( tick_count % STATS_TICKS == 0 ){
					
						int	local_total;
						int remote_total;
						
						if ( Logger.isEnabled()){
	
							synchronized( local_connections ){
	
								local_total = local_connections.size();
							}
								
							synchronized( remote_connections ){
	
								remote_total = remote_connections.size();
							}
							
							if  ( local_total + remote_total > 0 ){
								
								log( "local=" + local_total + " [" + getLocalConnectionStatus() + "], remote=" + remote_total + " [" + getRemoteConnectionStatus() + "]" );
							}
						}
					}

					if ( tick_count % KEEP_ALIVE_CHECK_TICKS == 0 ){
												
						synchronized( local_connections ){
						
							Iterator	it = local_connections.values().iterator();
							
							while( it.hasNext()){
								
								final GenericMessageConnectionIndirect con = (GenericMessageConnectionIndirect)it.next();
								
								if ( con.prepareForKeepAlive( false )){
									
									keep_alive_pool.run(
										new AERunnable()
										{
											public void
											runSupport()
											{
												con.keepAlive();
											}
										});
								}
							}
						}
						
						long	now = SystemTime.getCurrentTime();
						
						synchronized( remote_connections ){
							
							Iterator	it = remote_connections.values().iterator();
							
							while( it.hasNext()){
								
								GenericMessageConnectionIndirect con = (GenericMessageConnectionIndirect)it.next();
						
								long	last_receive = con.getLastMessageReceivedTime();
								
								if ( now - last_receive > KEEP_ALIVE_MIN * 3 ){
									
									try{
										con.close( new Throwable( "Timeout" ));
										
									}catch( Throwable e ){
										
										Debug.printStackTrace(e);
									}
								}
							}
						}
					}
				}
			});
	}
	
	protected static Map
	receive(
		MessageManagerImpl		message_manager,
		InetSocketAddress		originator,
		Map						message )
	{
		if (TRACE ){
			System.out.println( "receive:" + originator + "/" + message );
		}
			// if this purely a NAT traversal request then bail out 
		
		if ( !message.containsKey( "type" )){
			
			return( null );
		}
		
		int	type = ((Long)message.get("type")).intValue();
		
		if ( type == MESSAGE_TYPE_CONNECT ){
		
			String	msg_id 		= new String((byte[])message.get( "msg_id" ));
			String	msg_desc 	= new String((byte[])message.get( "msg_desc" ));

			GenericMessageEndpointImpl	endpoint = new GenericMessageEndpointImpl( originator );
			
			endpoint.addUDP( originator );
					
			GenericMessageHandler	handler = message_manager.getHandler( msg_id );
			
			if ( handler == null ){
				
				Debug.out( "No message handler registered for '" + msg_id + "'" );
				
				return( null );
			}
			
			try{
				Long	con_id;

				synchronized( remote_connections ){
					
					if ( remote_connections.size() >= MAX_REMOTE_CONNECTIONS ){
						
						Debug.out( "Maximum remote connections exceeded - request from " + originator + " denied [" + getRemoteConnectionStatus() + "]" );
						
						return( null );
					}
					
					int	num_from_this_ip = 0;
									
					Iterator	it = remote_connections.values().iterator();
					
					while( it.hasNext()){
						
						GenericMessageConnectionIndirect con = (GenericMessageConnectionIndirect)it.next();
						
						if ( con.getEndpoint().getNotionalAddress().getAddress().equals( originator.getAddress())){
							
							num_from_this_ip++;
						}
					}
					
					if ( num_from_this_ip >= MAX_REMOTE_CONNECTIONS_PER_IP ){
						
						Debug.out( "Maximum remote connections per-ip exceeded - request from " + originator + " denied [" + getRemoteConnectionStatus() + "]" );
						
						return( null );

					}
					con_id = new Long( connection_id_next++ );
				}
				
				GenericMessageConnectionIndirect indirect_connection = 
					new GenericMessageConnectionIndirect( 
							message_manager, msg_id, msg_desc, endpoint, con_id.longValue());

				GenericMessageConnectionImpl new_connection = new GenericMessageConnectionImpl( message_manager, indirect_connection );

				if ( handler.accept( new_connection )){
					
					new_connection.accepted();
	
					synchronized( remote_connections ){
												
						remote_connections.put( con_id, indirect_connection );
					}
					
					List	replies = indirect_connection.receive((List)message.get( "data" ));
				
					Map	reply = new HashMap();
					
					reply.put( "type", new Long( MESSAGE_TYPE_CONNECT ));
					reply.put( "con_id", con_id );
					reply.put( "data", replies );
					
					return( reply );
					
				}else{
					
					return( null );
				}	
			
			}catch( MessageException e ){
				
				Debug.out( "Error accepting message", e);
				
				return( null );
			}

		}else if ( type == MESSAGE_TYPE_DATA ){
			
			Long	con_id = (Long)message.get( "con_id" );
			
			GenericMessageConnectionIndirect indirect_connection;
			
			synchronized( remote_connections ){
				
				indirect_connection = (GenericMessageConnectionIndirect)remote_connections.get( con_id );
			}
			
			if ( indirect_connection == null ){
				
				return( null );
			}
			
			Map	reply = new HashMap();

			if ( indirect_connection.isClosed()){
				
				reply.put( "type", new Long( MESSAGE_TYPE_DISCONNECT ));

			}else{
				
				List replies = indirect_connection.receive((List)message.get( "data" ));
				
				reply.put( "type", new Long( MESSAGE_TYPE_DATA ));
				reply.put( "data", replies );	
								
				if ( indirect_connection.receiveIncomplete()){
					
					reply.put( "more_data", new Long(1));
				}
			}
			
			return( reply );
			
		}else{
			
				// error or disconnect		
			
			Long	con_id = (Long)message.get( "con_id" );
			
			GenericMessageConnectionIndirect indirect_connection;
			
			synchronized( remote_connections ){
				
				indirect_connection = (GenericMessageConnectionIndirect)remote_connections.get( con_id );
			}
			
			if ( indirect_connection != null ){
				
				try{
					indirect_connection.close( new Throwable( "Remote closed connection" ) );
					
				}catch( Throwable e ){
					
					Debug.printStackTrace(e);
				}
			}
			
			return( null );
		}
	}
	
	protected static String
	getRemoteConnectionStatus()
	{
		return( getConnectionStatus( remote_connections ));
	}
	
	protected static String
	getLocalConnectionStatus()
	{
		return( getConnectionStatus( local_connections ));
	}
	
	protected static String
	getConnectionStatus(
		Map		connections )
	{
		Map totals = new HashMap();	
	
		synchronized( connections ){
			
			Iterator	it = connections.values().iterator();
			
			while( it.hasNext()){
				
				GenericMessageConnectionIndirect con = (GenericMessageConnectionIndirect)it.next();
	
				InetAddress	originator = con.getEndpoint().getNotionalAddress().getAddress();
				
				Integer	i = (Integer)totals.get( originator );
				
				if ( i == null ){
					
					i = new Integer(1);
					
				}else{
					
					i = new Integer(i.intValue() + 1 );
				}
				
				totals.put( originator, i );
			}
		}
		
		String	str = "";
		
		Iterator it = totals.entrySet().iterator();
		
		while( it.hasNext()){
			
			Map.Entry entry = (Map.Entry)it.next();
			
			str += (str.length()==0?"":",") + entry.getKey() + ":" + entry.getValue();
		}
		
		return( str );
	}
	
	
	
	
	private MessageManagerImpl			message_manager;
	private String						msg_id;
	private String						msg_desc;
	private GenericMessageEndpoint		endpoint;
	
	private NATTraverser					nat_traverser;
	private GenericMessageConnectionImpl	owner;
	
	private	InetSocketAddress		rendezvous;
	private InetSocketAddress		target;
	
	private long					connection_id;
	private boolean					incoming;
	private boolean					closed;
	
	private LinkedList	send_queue		= new LinkedList();
	private AESemaphore	send_queue_sem	= new AESemaphore( "GenericMessageConnectionIndirect:sendq" );
	
	private volatile long		last_message_sent;
	private volatile long		last_message_received;
	private volatile boolean	keep_alive_in_progress;
	
	protected 
	GenericMessageConnectionIndirect(
		MessageManagerImpl			_message_manager,
		String						_msg_id,
		String						_msg_desc,
		GenericMessageEndpoint		_endpoint,
		InetSocketAddress			_rendezvous,
		InetSocketAddress			_target )
	{
			// outgoing
		
		message_manager	= _message_manager;
		msg_id			= _msg_id;
		msg_desc		= _msg_desc;
		endpoint		= _endpoint;
		rendezvous		= _rendezvous;
		target			= _target;
		
		nat_traverser = message_manager.getNATTraverser();
		
		log( "outgoing connection to " + endpoint.getNotionalAddress());
	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -