📄 prudppackethandlerimpl.java
字号:
/*
* File : PRUDPPacketReceiverImpl.java
* Created : 20-Jan-2004
* By : parg
*
* Azureus - a Java Bittorrent client
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details ( see the LICENSE file ).
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package com.aelitis.net.udp.uc.impl;
/**
* @author parg
*
*/
import java.util.*;
import java.io.*;
import java.net.*;
import org.gudy.azureus2.core3.config.*;
import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.*;
import com.aelitis.net.udp.uc.PRUDPPacket;
import com.aelitis.net.udp.uc.PRUDPPacketHandler;
import com.aelitis.net.udp.uc.PRUDPPacketHandlerException;
import com.aelitis.net.udp.uc.PRUDPPacketHandlerStats;
import com.aelitis.net.udp.uc.PRUDPPacketReceiver;
import com.aelitis.net.udp.uc.PRUDPPacketReply;
import com.aelitis.net.udp.uc.PRUDPPacketRequest;
import com.aelitis.net.udp.uc.PRUDPRequestHandler;
import org.bouncycastle.util.encoders.Base64;
public class
PRUDPPacketHandlerImpl
implements PRUDPPacketHandler
{
private static final LogIDs LOGID = LogIDs.NET;
private boolean TRACE_REQUESTS = false;
private static final long MAX_SEND_QUEUE_DATA_SIZE = 2*1024*1024;
private static final long MAX_RECV_QUEUE_DATA_SIZE = 1*1024*1024;
private int port;
private DatagramSocket socket;
private PRUDPRequestHandler request_handler;
private PRUDPPacketHandlerStatsImpl stats = new PRUDPPacketHandlerStatsImpl( this );
private Map requests = new HashMap();
private AEMonitor requests_mon = new AEMonitor( "PRUDPPH:req" );
private AEMonitor send_queue_mon = new AEMonitor( "PRUDPPH:sd" );
private long send_queue_data_size;
private List[] send_queues = new List[]{ new LinkedList(),new LinkedList(),new LinkedList()};
private AESemaphore send_queue_sem = new AESemaphore( "PRUDPPH:sq" );
private AEThread send_thread;
private AEMonitor recv_queue_mon = new AEMonitor( "PRUDPPH:rq" );
private long recv_queue_data_size;
private List recv_queue = new ArrayList();
private AESemaphore recv_queue_sem = new AESemaphore( "PRUDPPH:rq" );
private AEThread recv_thread;
private int send_delay = 0;
private int receive_delay = 0;
private int queued_request_timeout = 0;
private long total_requests_received;
private long total_requests_processed;
private long total_replies;
private long last_error_report;
protected
PRUDPPacketHandlerImpl(
int _port )
{
port = _port;
final AESemaphore init_sem = new AESemaphore("PRUDPPacketHandler");
Thread t = new AEThread( "PRUDPPacketReciever:".concat(String.valueOf(port)))
{
public void
runSupport()
{
receiveLoop(init_sem);
}
};
t.setDaemon(true);
t.start();
SimpleTimer.addPeriodicEvent(
5000,
new TimerEventPerformer()
{
public void
perform(
TimerEvent event )
{
checkTimeouts();
}
});
init_sem.reserve();
}
public void
setRequestHandler(
PRUDPRequestHandler _request_handler )
{
if ( request_handler != null ){
if ( _request_handler != null ){
// if we need to support this then the handler will have to be associated
// with a message type map, or we chain together and give each handler
// a bite at processing the message
throw( new RuntimeException( "Multiple handlers per endpoint not supported" ));
}
}else{
request_handler = _request_handler;
}
}
public PRUDPRequestHandler
getRequestHandler()
{
return( request_handler );
}
public int
getPort()
{
return( port );
}
protected void
receiveLoop(
AESemaphore init_sem )
{
try{
String bind_ip = COConfigurationManager.getStringParameter("Bind IP", "");
InetSocketAddress address;
if ( bind_ip.length() == 0 ){
address = new InetSocketAddress("127.0.0.1",port);
socket = new DatagramSocket( port );
}else{
address = new InetSocketAddress(InetAddress.getByName(bind_ip), port);
socket = new DatagramSocket( address );
}
socket.setReuseAddress(true);
socket.setSoTimeout( PRUDPPacket.DEFAULT_UDP_TIMEOUT );
init_sem.release();
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID,
"PRUDPPacketReceiver: receiver established on port " + port));
byte[] buffer = new byte[PRUDPPacket.MAX_PACKET_SIZE];
long successful_accepts = 0;
long failed_accepts = 0;
while(true){
try{
DatagramPacket packet = new DatagramPacket( buffer, buffer.length, address );
socket.receive( packet );
successful_accepts++;
failed_accepts = 0;
process( packet );
}catch( SocketTimeoutException e ){
}catch( Throwable e ){
failed_accepts++;
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID,
"PRUDPPacketReceiver: receive failed on port " + port, e));
if (( failed_accepts > 100 && successful_accepts == 0 ) || failed_accepts > 1000 ){
Logger.logTextResource(new LogAlert(LogAlert.UNREPEATABLE,
LogAlert.AT_ERROR, "Network.alert.acceptfail"), new String[] {
"" + port, "UDP" });
// break, sometimes get a screaming loop. e.g.
/*
[2:01:55] DEBUG::Tue Dec 07 02:01:55 EST 2004
[2:01:55] java.net.SocketException: Socket operation on nonsocket: timeout in datagram socket peek
[2:01:55] at java.net.PlainDatagramSocketImpl.peekData(Native Method)
[2:01:55] at java.net.DatagramSocket.receive(Unknown Source)
[2:01:55] at org.gudy.azureus2.core3.tracker.server.impl.udp.TRTrackerServerUDP.recvLoop(TRTrackerServerUDP.java:118)
[2:01:55] at org.gudy.azureus2.core3.tracker.server.impl.udp.TRTrackerServerUDP$1.runSupport(TRTrackerServerUDP.java:90)
[2:01:55] at org.gudy.azureus2.core3.util.AEThread.run(AEThread.java:45)
*/
break;
}
}
}
}catch( Throwable e ){
Logger.logTextResource(new LogAlert(LogAlert.UNREPEATABLE,
LogAlert.AT_ERROR, "Tracker.alert.listenfail"), new String[] { "UDP:"
+ port });
Logger.log(new LogEvent(LOGID, "PRUDPPacketReceiver: "
+ "DatagramSocket bind failed on port " + port, e));
}finally{
init_sem.release();
}
}
protected void
checkTimeouts()
{
long now = SystemTime.getCurrentTime();
List timed_out = new ArrayList();
try{
requests_mon.enter();
Iterator it = requests.values().iterator();
while( it.hasNext()){
PRUDPPacketHandlerRequestImpl request = (PRUDPPacketHandlerRequestImpl)it.next();
long sent_time = request.getSendTime();
if ( sent_time != 0 &&
now - sent_time >= request.getTimeout()){
it.remove();
stats.requestTimedOut();
timed_out.add( request );
}
}
}finally{
requests_mon.exit();
}
for (int i=0;i<timed_out.size();i++){
PRUDPPacketHandlerRequestImpl request = (PRUDPPacketHandlerRequestImpl)timed_out.get(i);
if ( TRACE_REQUESTS ){
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, LogEvent.LT_ERROR,
"PRUDPPacketHandler: request timeout"));
}
// don't change the text of this message, it's used elsewhere
try{
request.setException(new PRUDPPacketHandlerException("timed out"));
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
protected void
process(
DatagramPacket dg_packet )
{
try{
// HACK alert. Due to the form of the tracker UDP protocol (no common
// header for requests and replies) we enforce a rule. All connection ids
// must have their MSB set. As requests always start with the action, which
// always has the MSB clear, we can use this to differentiate.
byte[] packet_data = dg_packet.getData();
int packet_len = dg_packet.getLength();
// System.out.println( "received:" + packet_len );
PRUDPPacket packet;
boolean request_packet;
stats.packetReceived(packet_len);
if ( ( packet_data[0]&0x80 ) == 0 ){
request_packet = false;
packet = PRUDPPacketReply.deserialiseReply(
this,
new DataInputStream(new ByteArrayInputStream( packet_data, 0, packet_len)));
}else{
request_packet = true;
packet = PRUDPPacketRequest.deserialiseRequest(
this,
new DataInputStream(new ByteArrayInputStream( packet_data, 0, packet_len)));
}
packet.setSerialisedSize( packet_len );
packet.setAddress( (InetSocketAddress)dg_packet.getSocketAddress());
if ( request_packet ){
total_requests_received++;
// System.out.println( "Incoming from " + dg_packet.getAddress());
if ( TRACE_REQUESTS ){
Logger.log(new LogEvent(LOGID,
"PRUDPPacketHandler: request packet received: "
+ packet.getString()));
}
if ( receive_delay > 0 ){
// we take the processing offline so that these incoming requests don't
// interfere with replies to outgoing requests
try{
recv_queue_mon.enter();
if ( recv_queue_data_size > MAX_RECV_QUEUE_DATA_SIZE ){
long now = SystemTime.getCurrentTime();
if ( now - last_error_report > 30000 ){
last_error_report = now;
Debug.out( "Receive queue size limit exceeded (" +
MAX_RECV_QUEUE_DATA_SIZE + "), dropping request packet [" +
total_requests_received + "/" + total_requests_processed + ":" + total_replies + "]");
}
}else if ( receive_delay * recv_queue.size() > queued_request_timeout ){
// by the time this request gets processed it'll have timed out
// in the caller anyway, so discard it
long now = SystemTime.getCurrentTime();
if ( now - last_error_report > 30000 ){
last_error_report = now;
Debug.out( "Receive queue entry limit exceeded (" +
recv_queue.size() + "), dropping request packet ]" +
total_requests_received + "/" + total_requests_processed + ":" + total_replies + "]");
}
}else{
recv_queue.add( new Object[]{ packet, new Integer( dg_packet.getLength()) });
recv_queue_data_size += dg_packet.getLength();
recv_queue_sem.release();
if ( recv_thread == null ){
recv_thread =
new AEThread( "PRUDPPacketHandler:receiver" )
{
public void
runSupport()
{
while( true ){
try{
recv_queue_sem.reserve();
Object[] data;
try{
recv_queue_mon.enter();
data = (Object[])recv_queue.remove(0);
total_requests_processed++;
}finally{
recv_queue_mon.exit();
}
PRUDPPacketRequest p = (PRUDPPacketRequest)data[0];
recv_queue_data_size -= ((Integer)data[1]).intValue();
PRUDPRequestHandler handler = request_handler;
if ( handler != null ){
handler.process( p );
Thread.sleep( receive_delay );
}
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
};
recv_thread.setDaemon( true );
recv_thread.start();
}
}
}finally{
recv_queue_mon.exit();
}
}else{
PRUDPRequestHandler handler = request_handler;
if ( handler != null ){
handler.process( (PRUDPPacketRequest)packet );
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -