⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 prudppackethandlerimpl.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*
 * File    : PRUDPPacketReceiverImpl.java
 * Created : 20-Jan-2004
 * By      : parg
 * 
 * Azureus - a Java Bittorrent client
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details ( see the LICENSE file ).
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */

package com.aelitis.net.udp.uc.impl;

/**
 * @author parg
 *
 */

import java.util.*;
import java.io.*;
import java.net.*;

import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.*;

import com.aelitis.azureus.core.networkmanager.admin.NetworkAdmin;
import com.aelitis.azureus.core.networkmanager.admin.NetworkAdminPropertyChangeListener;
import com.aelitis.net.udp.uc.PRUDPPacket;
import com.aelitis.net.udp.uc.PRUDPPacketHandler;
import com.aelitis.net.udp.uc.PRUDPPacketHandlerException;
import com.aelitis.net.udp.uc.PRUDPPacketHandlerStats;
import com.aelitis.net.udp.uc.PRUDPPacketReceiver;
import com.aelitis.net.udp.uc.PRUDPPacketReply;
import com.aelitis.net.udp.uc.PRUDPPacketRequest;
import com.aelitis.net.udp.uc.PRUDPPrimordialHandler;
import com.aelitis.net.udp.uc.PRUDPRequestHandler;

import org.bouncycastle.util.encoders.Base64;

public class 
PRUDPPacketHandlerImpl
	implements PRUDPPacketHandler
{	
	private static final LogIDs LOGID = LogIDs.NET;
	
	private boolean			TRACE_REQUESTS	= false;
	
	private static final long	MAX_SEND_QUEUE_DATA_SIZE	= 2*1024*1024;
	private static final long	MAX_RECV_QUEUE_DATA_SIZE	= 1*1024*1024;
	
	private int				port;
	private DatagramSocket	socket;
	
	private PRUDPPrimordialHandler	primordial_handler;
	private PRUDPRequestHandler		request_handler;
	
	private PRUDPPacketHandlerStatsImpl	stats = new PRUDPPacketHandlerStatsImpl( this );
	
	
	private Map			requests = new HashMap();
	private AEMonitor	requests_mon	= new AEMonitor( "PRUDPPH:req" );
	
	
	private AEMonitor	send_queue_mon	= new AEMonitor( "PRUDPPH:sd" );
	private long		send_queue_data_size;
	private List[]		send_queues		= new List[]{ new LinkedList(),new LinkedList(),new LinkedList()};
	private AESemaphore	send_queue_sem	= new AESemaphore( "PRUDPPH:sq" );
	private AEThread	send_thread;
	
	private AEMonitor	recv_queue_mon	= new AEMonitor( "PRUDPPH:rq" );
	private long		recv_queue_data_size;
	private List		recv_queue		= new ArrayList();
	private AESemaphore	recv_queue_sem	= new AESemaphore( "PRUDPPH:rq" );
	private AEThread	recv_thread;
	
	private int			send_delay				= 0;
	private int			receive_delay			= 0;
	private int			queued_request_timeout	= 0;
	
	private long		total_requests_received;
	private long		total_requests_processed;
	private long		total_replies;
	private long		last_error_report;
	
	private AEMonitor	bind_address_mon	= new AEMonitor( "PRUDPPH:bind" );

	private InetAddress				default_bind_ip;
	private InetAddress				explicit_bind_ip;
	
	private volatile InetAddress				current_bind_ip;
	private volatile InetAddress				target_bind_ip;
	
	private volatile boolean		failed;
	private volatile boolean		destroyed;
	private AESemaphore destroy_sem = new AESemaphore("PRUDPPacketHandler:destroy");

	private Throwable 	init_error;
	
	protected
	PRUDPPacketHandlerImpl(
		int				_port,
		InetAddress		_bind_ip )
	{
		port				= _port;
		explicit_bind_ip	= _bind_ip;
		
		default_bind_ip = NetworkAdmin.getSingleton().getDefaultBindAddress();
		
		calcBind();
		
		final AESemaphore init_sem = new AESemaphore("PRUDPPacketHandler:init");
		
		Thread t = new AEThread( "PRUDPPacketReciever:".concat(String.valueOf(port)))
			{
				public void
				runSupport()
				{
					receiveLoop(init_sem);
				}
			};
		
		t.setDaemon(true);
		
		t.start();
		
		final TimerEventPeriodic[]	f_ev = {null};
		
		TimerEventPeriodic ev = 
			SimpleTimer.addPeriodicEvent(
				"PRUDP:timeouts",
				5000,
				new TimerEventPerformer()
				{
					public void
					perform(
						TimerEvent	event )
					{
						if ( destroyed && f_ev[0] != null ){
							
							f_ev[0].cancel();
						}
						checkTimeouts();
					}
				});
		
		f_ev[0] = ev;
		
		init_sem.reserve();
	}
	
	public void
	setPrimordialHandler(
		PRUDPPrimordialHandler	handler )
	{
		if ( primordial_handler != null && handler != null ){
			
			Debug.out( "Primordial handler replaced!" );
		}
		
		primordial_handler	= handler;
	}
	
	public void
	setRequestHandler(
		PRUDPRequestHandler		_request_handler )
	{
		if ( request_handler != null ){
		
			if ( _request_handler != null ){
				
					// if we need to support this then the handler will have to be associated
					// with a message type map, or we chain together and give each handler
					// a bite at processing the message
				
				throw( new RuntimeException( "Multiple handlers per endpoint not supported" ));
			}
		}
		
		request_handler	= _request_handler;
	}
	
	public PRUDPRequestHandler
	getRequestHandler()
	{
		return( request_handler );
	}
	
	public int
	getPort()
	{
		return( port );
	}
	
	protected void
	setDefaultBindAddress(
		InetAddress	address )
	{
		try{
			bind_address_mon.enter();
			
			default_bind_ip	= address;
			
			calcBind();
			
		}finally{
			
			bind_address_mon.exit();
		}
	}
	
	public void
	setExplicitBindAddress(
		InetAddress	address )
	{
		try{
			bind_address_mon.enter();
			
			explicit_bind_ip	= address;
			
			calcBind();
			
		}finally{
			
			bind_address_mon.exit();
		}
		
		int	loops = 0;
		
		while( current_bind_ip != target_bind_ip && !(failed || destroyed)){
			
			if ( loops >= 100 ){
				
				Debug.out( "Giving up on wait for bind ip change to take effect" );
				
				break;
			}
			
			try{
				Thread.sleep(50);
				
				loops++;
				
			}catch( Throwable e ){
				
				break;
			}
		}
	}
	
	protected void
	calcBind()
	{
		if ( explicit_bind_ip != null ){
			
			target_bind_ip = explicit_bind_ip;
			
		}else{
			
			target_bind_ip = default_bind_ip;
		}
	}
	
	protected void
	receiveLoop(
		AESemaphore	init_sem )
	{
		NetworkAdminPropertyChangeListener prop_listener = 
			new NetworkAdminPropertyChangeListener()
	    	{
	    		public void
	    		propertyChanged(
	    			String		property )
	    		{
	    			if ( property == NetworkAdmin.PR_DEFAULT_BIND_ADDRESS ){
	    				
	    				setDefaultBindAddress( NetworkAdmin.getSingleton().getDefaultBindAddress());
	    			}
	    		}
	    	};
    	
	    NetworkAdmin.getSingleton().addPropertyChangeListener( prop_listener );

		try{
				// outter loop picks up bind-ip changes
			
			while( !( failed || destroyed )){
				
				if ( socket != null ){
					
					try{
						socket.close();
						
					}catch( Throwable e ){
						
						Debug.printStackTrace(e);
					}
				}
								
				InetSocketAddress	address;
				
				DatagramSocket	new_socket;
				
				if ( target_bind_ip == null ){
					
					address = new InetSocketAddress("127.0.0.1",port);
					
					new_socket = new DatagramSocket( port );
					
				}else{
					
					address = new InetSocketAddress( target_bind_ip, port );
					
					new_socket = new DatagramSocket( address );		
				}
						
				new_socket.setReuseAddress(true);
				
					// short timeout on receive so that we can interrupt a receive fairly quickly
				
				new_socket.setSoTimeout( 1000 );
				
					// only make the socket public once fully configured
								
				socket = new_socket;
				
				current_bind_ip	= target_bind_ip;
								
				init_sem.release();
				
				if (Logger.isEnabled())
					Logger.log(new LogEvent(LOGID,
							"PRUDPPacketReceiver: receiver established on port " + port + (current_bind_ip==null?"":(", bound to " + current_bind_ip )))); 
		
				byte[] buffer = null;
				
				long	successful_accepts 	= 0;
				long	failed_accepts		= 0;
				
				while( !( failed || destroyed )){
					
					if ( current_bind_ip != target_bind_ip ){
						
						break;
					}
					
					try{
						
						if ( buffer == null ){
							
							buffer = new byte[PRUDPPacket.MAX_PACKET_SIZE];
						}
	
						DatagramPacket packet = new DatagramPacket( buffer, buffer.length, address );
						
						socket.receive( packet );
						
						long	receive_time = SystemTime.getCurrentTime();
						
						successful_accepts++;
						
						failed_accepts = 0;
						
						PRUDPPrimordialHandler prim_hand = primordial_handler;
						
						if ( prim_hand != null ){
							
							if ( prim_hand.packetReceived( packet )){
						
									// primordial handlers get their own buffer as we can't guarantee
									// that they don't need to hang onto the data
								
								buffer	= null;
								
								stats.primordialPacketReceived( packet.getLength());
							}
						}
						
						if ( buffer != null ){
							
							process( packet, receive_time );
						}
					
					}catch( SocketTimeoutException e ){
											
					}catch( Throwable e ){
							
						failed_accepts++;
						
						if (Logger.isEnabled())
							Logger.log(new LogEvent(LOGID,
									"PRUDPPacketReceiver: receive failed on port " + port, e)); 
	
						if (( failed_accepts > 100 && successful_accepts == 0 ) || failed_accepts > 1000 ){						
			
							Logger.logTextResource(new LogAlert(LogAlert.UNREPEATABLE,
									LogAlert.AT_ERROR, "Network.alert.acceptfail"), new String[] {
									"" + port, "UDP" });
											
								// break, sometimes get a screaming loop. e.g.
							/*
							[2:01:55]  DEBUG::Tue Dec 07 02:01:55 EST 2004
							[2:01:55]    java.net.SocketException: Socket operation on nonsocket: timeout in datagram socket peek
							[2:01:55]  	at java.net.PlainDatagramSocketImpl.peekData(Native Method)
							[2:01:55]  	at java.net.DatagramSocket.receive(Unknown Source)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -