⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 prudppackethandlerimpl.java

📁 基于JXTA开发平台的下载软件开发源代码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * File    : PRUDPPacketReceiverImpl.java
 * Created : 20-Jan-2004
 * By      : parg
 * 
 * Azureus - a Java Bittorrent client
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details ( see the LICENSE file ).
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */

package com.aelitis.net.udp.uc.impl;

/**
 * @author parg
 *
 */

import java.util.*;
import java.io.*;
import java.net.*;

import org.gudy.azureus2.core3.config.*;
import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.*;

import com.aelitis.net.udp.uc.PRUDPPacket;
import com.aelitis.net.udp.uc.PRUDPPacketHandler;
import com.aelitis.net.udp.uc.PRUDPPacketHandlerException;
import com.aelitis.net.udp.uc.PRUDPPacketHandlerStats;
import com.aelitis.net.udp.uc.PRUDPPacketReceiver;
import com.aelitis.net.udp.uc.PRUDPPacketReply;
import com.aelitis.net.udp.uc.PRUDPPacketRequest;
import com.aelitis.net.udp.uc.PRUDPRequestHandler;

import org.bouncycastle.util.encoders.Base64;

public class 
PRUDPPacketHandlerImpl
	implements PRUDPPacketHandler
{	
	private static final LogIDs LOGID = LogIDs.NET;
	
	private boolean			TRACE_REQUESTS	= false;
	
	private static final long	MAX_SEND_QUEUE_DATA_SIZE	= 2*1024*1024;
	private static final long	MAX_RECV_QUEUE_DATA_SIZE	= 1*1024*1024;
	
	private int				port;
	private DatagramSocket	socket;
	
	private PRUDPRequestHandler	request_handler;
	
	private PRUDPPacketHandlerStatsImpl	stats = new PRUDPPacketHandlerStatsImpl( this );
	
	
	private Map			requests = new HashMap();
	private AEMonitor	requests_mon	= new AEMonitor( "PRUDPPH:req" );
	
	
	private AEMonitor	send_queue_mon	= new AEMonitor( "PRUDPPH:sd" );
	private long		send_queue_data_size;
	private List[]		send_queues		= new List[]{ new LinkedList(),new LinkedList(),new LinkedList()};
	private AESemaphore	send_queue_sem	= new AESemaphore( "PRUDPPH:sq" );
	private AEThread	send_thread;
	
	private AEMonitor	recv_queue_mon	= new AEMonitor( "PRUDPPH:rq" );
	private long		recv_queue_data_size;
	private List		recv_queue		= new ArrayList();
	private AESemaphore	recv_queue_sem	= new AESemaphore( "PRUDPPH:rq" );
	private AEThread	recv_thread;
	
	private int			send_delay				= 0;
	private int			receive_delay			= 0;
	private int			queued_request_timeout	= 0;
	
	private long		total_requests_received;
	private long		total_requests_processed;
	private long		total_replies;
	private long		last_error_report;
	
	protected
	PRUDPPacketHandlerImpl(
		int		_port )
	{
		port		= _port;
		
		final AESemaphore init_sem = new AESemaphore("PRUDPPacketHandler");
		
		Thread t = new AEThread( "PRUDPPacketReciever:".concat(String.valueOf(port)))
			{
				public void
				runSupport()
				{
					receiveLoop(init_sem);
				}
			};
		
		t.setDaemon(true);
		
		t.start();
		
		SimpleTimer.addPeriodicEvent(
				5000,
				new TimerEventPerformer()
				{
					public void
					perform(
						TimerEvent	event )
					{
						checkTimeouts();
					}
				});
		
		init_sem.reserve();
	}
	
	public void
	setRequestHandler(
		PRUDPRequestHandler		_request_handler )
	{
		if ( request_handler != null ){
		
			if ( _request_handler != null ){
				
					// if we need to support this then the handler will have to be associated
					// with a message type map, or we chain together and give each handler
					// a bite at processing the message
				
				throw( new RuntimeException( "Multiple handlers per endpoint not supported" ));
			}
			
		}else{
		
			request_handler	= _request_handler;
		}
	}
	
	public PRUDPRequestHandler
	getRequestHandler()
	{
		return( request_handler );
	}
	
	public int
	getPort()
	{
		return( port );
	}
	
	protected void
	receiveLoop(
		AESemaphore	init_sem )
	{
		try{
			String bind_ip = COConfigurationManager.getStringParameter("Bind IP", "");
			
			InetSocketAddress	address;
			
			if ( bind_ip.length() == 0 ){
				
				address = new InetSocketAddress("127.0.0.1",port);
				
				socket = new DatagramSocket( port );
				
			}else{
				
				address = new InetSocketAddress(InetAddress.getByName(bind_ip), port);
				
				socket = new DatagramSocket( address );		
			}
					
			socket.setReuseAddress(true);
			
			socket.setSoTimeout( PRUDPPacket.DEFAULT_UDP_TIMEOUT );
			
			init_sem.release();
			
			if (Logger.isEnabled())
				Logger.log(new LogEvent(LOGID,
						"PRUDPPacketReceiver: receiver established on port " + port)); 
	
			byte[] buffer = new byte[PRUDPPacket.MAX_PACKET_SIZE];
			
			long	successful_accepts 	= 0;
			long	failed_accepts		= 0;
			
			while(true){
				
				try{
						
					DatagramPacket packet = new DatagramPacket( buffer, buffer.length, address );
					
					socket.receive( packet );
					
					successful_accepts++;
					
					failed_accepts = 0;
					
					process( packet );
				
				}catch( SocketTimeoutException e ){
										
				}catch( Throwable e ){
						
					failed_accepts++;
					
					if (Logger.isEnabled())
						Logger.log(new LogEvent(LOGID,
								"PRUDPPacketReceiver: receive failed on port " + port, e)); 

					if (( failed_accepts > 100 && successful_accepts == 0 ) || failed_accepts > 1000 ){						
		
						Logger.logTextResource(new LogAlert(LogAlert.UNREPEATABLE,
								LogAlert.AT_ERROR, "Network.alert.acceptfail"), new String[] {
								"" + port, "UDP" });
										
							// break, sometimes get a screaming loop. e.g.
						/*
						[2:01:55]  DEBUG::Tue Dec 07 02:01:55 EST 2004
						[2:01:55]    java.net.SocketException: Socket operation on nonsocket: timeout in datagram socket peek
						[2:01:55]  	at java.net.PlainDatagramSocketImpl.peekData(Native Method)
						[2:01:55]  	at java.net.DatagramSocket.receive(Unknown Source)
						[2:01:55]  	at org.gudy.azureus2.core3.tracker.server.impl.udp.TRTrackerServerUDP.recvLoop(TRTrackerServerUDP.java:118)
						[2:01:55]  	at org.gudy.azureus2.core3.tracker.server.impl.udp.TRTrackerServerUDP$1.runSupport(TRTrackerServerUDP.java:90)
						[2:01:55]  	at org.gudy.azureus2.core3.util.AEThread.run(AEThread.java:45)
						*/
						
						break;
					}					
				}
			}
		}catch( Throwable e ){
			Logger.logTextResource(new LogAlert(LogAlert.UNREPEATABLE,
					LogAlert.AT_ERROR, "Tracker.alert.listenfail"), new String[] { "UDP:"
					+ port });
			
			Logger.log(new LogEvent(LOGID, "PRUDPPacketReceiver: "
					+ "DatagramSocket bind failed on port " + port, e));
			
		}finally{
			
			init_sem.release();
		}
	}
	
	protected void
	checkTimeouts()
	{
		long	now = SystemTime.getCurrentTime();
			
		List	timed_out = new ArrayList();
		
		try{
			requests_mon.enter();
			
			Iterator it = requests.values().iterator();
			
			while( it.hasNext()){
				
				PRUDPPacketHandlerRequestImpl	request = (PRUDPPacketHandlerRequestImpl)it.next();
				
				long	sent_time = request.getSendTime();
				
				if ( 	sent_time != 0 &&
						now - sent_time >= request.getTimeout()){
				
					it.remove();

					stats.requestTimedOut();
					
					timed_out.add( request );
				}
			}
		}finally{
			
			requests_mon.exit();
		}
		
		for (int i=0;i<timed_out.size();i++){
			
			PRUDPPacketHandlerRequestImpl	request = (PRUDPPacketHandlerRequestImpl)timed_out.get(i);
			
			if ( TRACE_REQUESTS ){
				if (Logger.isEnabled())
					Logger.log(new LogEvent(LOGID, LogEvent.LT_ERROR,
							"PRUDPPacketHandler: request timeout")); 
			}
				// don't change the text of this message, it's used elsewhere
			
			try{
				request.setException(new PRUDPPacketHandlerException("timed out"));
				
			}catch( Throwable e ){
				
				Debug.printStackTrace(e);
			}
		}
	}
	
	protected void
	process(
		DatagramPacket	dg_packet )
	{
		try{
				// HACK alert. Due to the form of the tracker UDP protocol (no common
				// header for requests and replies) we enforce a rule. All connection ids
				// must have their MSB set. As requests always start with the action, which
				// always has the MSB clear, we can use this to differentiate. 
			
			byte[]	packet_data = dg_packet.getData();
			int		packet_len	= dg_packet.getLength();
			
			// System.out.println( "received:" + packet_len );
			
			PRUDPPacket packet;
			
			boolean	request_packet;
						
			stats.packetReceived(packet_len);
			
			if ( ( packet_data[0]&0x80 ) == 0 ){
				
				request_packet	= false;
				
				packet = PRUDPPacketReply.deserialiseReply( 
					this,
					new DataInputStream(new ByteArrayInputStream( packet_data, 0, packet_len)));
				
			}else{
				
				request_packet	= true;
				
				packet = PRUDPPacketRequest.deserialiseRequest( 
						this,
						new DataInputStream(new ByteArrayInputStream( packet_data, 0, packet_len)));
		
			}
			
			packet.setSerialisedSize( packet_len );
			
			packet.setAddress( (InetSocketAddress)dg_packet.getSocketAddress());
			
			if ( request_packet ){
					
				total_requests_received++;
				
				// System.out.println( "Incoming from " + dg_packet.getAddress());
				
				if ( TRACE_REQUESTS ){
					Logger.log(new LogEvent(LOGID,
							"PRUDPPacketHandler: request packet received: "
									+ packet.getString())); 
				}
				
				if ( receive_delay > 0 ){
					
						// we take the processing offline so that these incoming requests don't
						// interfere with replies to outgoing requests
					
					try{
						recv_queue_mon.enter();
						
						if ( recv_queue_data_size > MAX_RECV_QUEUE_DATA_SIZE ){
							
							long	now = SystemTime.getCurrentTime();
							
							if ( now - last_error_report > 30000 ){
								
								last_error_report	= now;
								
								Debug.out( "Receive queue size limit exceeded (" + 
											MAX_RECV_QUEUE_DATA_SIZE + "), dropping request packet [" +
											total_requests_received + "/" + total_requests_processed + ":" + total_replies + "]");
							}
							
						}else if ( receive_delay * recv_queue.size() > queued_request_timeout ){
							
								// by the time this request gets processed it'll have timed out
								// in the caller anyway, so discard it
							
							long	now = SystemTime.getCurrentTime();
							
							if ( now - last_error_report > 30000 ){
								
								last_error_report	= now;

								Debug.out( "Receive queue entry limit exceeded (" + 
											recv_queue.size() + "), dropping request packet ]" +
											total_requests_received + "/" + total_requests_processed + ":" + total_replies + "]");
							}
							
						}else{
							
							recv_queue.add( new Object[]{ packet, new Integer( dg_packet.getLength()) });
											
							recv_queue_data_size	+= dg_packet.getLength();
														
							recv_queue_sem.release();
					
							if ( recv_thread == null ){
								
								recv_thread = 
									new AEThread( "PRUDPPacketHandler:receiver" )
									{
										public void
										runSupport()
										{
											while( true ){
												
												try{
													recv_queue_sem.reserve();
													
													Object[]	data;										
													
													try{
														recv_queue_mon.enter();
													
														data = (Object[])recv_queue.remove(0);
														
														total_requests_processed++;
														
													}finally{
														
														recv_queue_mon.exit();
													}
													
													PRUDPPacketRequest	p = (PRUDPPacketRequest)data[0];
													
													recv_queue_data_size -= ((Integer)data[1]).intValue();
													
													PRUDPRequestHandler	handler = request_handler;
													
													if ( handler != null ){
														
														handler.process( p );
													
														Thread.sleep( receive_delay );
													}
													
												}catch( Throwable e ){
													
													Debug.printStackTrace(e);
												}
											}
										}
									};
								
								recv_thread.setDaemon( true );
								
								recv_thread.start();
							}
						}
					}finally{
						
						recv_queue_mon.exit();
					}
				}else{
				
					PRUDPRequestHandler	handler = request_handler;
					
					if ( handler != null ){
						
						handler.process( (PRUDPPacketRequest)packet );
					}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -