📄 prudppackethandlerimpl.java
字号:
/*
* File : PRUDPPacketReceiverImpl.java
* Created : 20-Jan-2004
* By : parg
*
* Azureus - a Java Bittorrent client
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details ( see the LICENSE file ).
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package com.aelitis.net.udp.uc.impl;
/**
* @author parg
*
*/
import java.util.*;
import java.io.*;
import java.net.*;
import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.*;
import com.aelitis.azureus.core.networkmanager.admin.NetworkAdmin;
import com.aelitis.azureus.core.networkmanager.admin.NetworkAdminPropertyChangeListener;
import com.aelitis.net.udp.uc.PRUDPPacket;
import com.aelitis.net.udp.uc.PRUDPPacketHandler;
import com.aelitis.net.udp.uc.PRUDPPacketHandlerException;
import com.aelitis.net.udp.uc.PRUDPPacketHandlerStats;
import com.aelitis.net.udp.uc.PRUDPPacketReceiver;
import com.aelitis.net.udp.uc.PRUDPPacketReply;
import com.aelitis.net.udp.uc.PRUDPPacketRequest;
import com.aelitis.net.udp.uc.PRUDPPrimordialHandler;
import com.aelitis.net.udp.uc.PRUDPRequestHandler;
import org.bouncycastle.util.encoders.Base64;
public class
PRUDPPacketHandlerImpl
implements PRUDPPacketHandler
{
private static final LogIDs LOGID = LogIDs.NET;
private boolean TRACE_REQUESTS = false;
private static final long MAX_SEND_QUEUE_DATA_SIZE = 2*1024*1024;
private static final long MAX_RECV_QUEUE_DATA_SIZE = 1*1024*1024;
private int port;
private DatagramSocket socket;
private PRUDPPrimordialHandler primordial_handler;
private PRUDPRequestHandler request_handler;
private PRUDPPacketHandlerStatsImpl stats = new PRUDPPacketHandlerStatsImpl( this );
private Map requests = new HashMap();
private AEMonitor requests_mon = new AEMonitor( "PRUDPPH:req" );
private AEMonitor send_queue_mon = new AEMonitor( "PRUDPPH:sd" );
private long send_queue_data_size;
private List[] send_queues = new List[]{ new LinkedList(),new LinkedList(),new LinkedList()};
private AESemaphore send_queue_sem = new AESemaphore( "PRUDPPH:sq" );
private AEThread send_thread;
private AEMonitor recv_queue_mon = new AEMonitor( "PRUDPPH:rq" );
private long recv_queue_data_size;
private List recv_queue = new ArrayList();
private AESemaphore recv_queue_sem = new AESemaphore( "PRUDPPH:rq" );
private AEThread recv_thread;
private int send_delay = 0;
private int receive_delay = 0;
private int queued_request_timeout = 0;
private long total_requests_received;
private long total_requests_processed;
private long total_replies;
private long last_error_report;
private AEMonitor bind_address_mon = new AEMonitor( "PRUDPPH:bind" );
private InetAddress default_bind_ip;
private InetAddress explicit_bind_ip;
private volatile InetAddress current_bind_ip;
private volatile InetAddress target_bind_ip;
private volatile boolean failed;
private volatile boolean destroyed;
private AESemaphore destroy_sem = new AESemaphore("PRUDPPacketHandler:destroy");
private Throwable init_error;
protected
PRUDPPacketHandlerImpl(
int _port,
InetAddress _bind_ip )
{
port = _port;
explicit_bind_ip = _bind_ip;
default_bind_ip = NetworkAdmin.getSingleton().getDefaultBindAddress();
calcBind();
final AESemaphore init_sem = new AESemaphore("PRUDPPacketHandler:init");
Thread t = new AEThread( "PRUDPPacketReciever:".concat(String.valueOf(port)))
{
public void
runSupport()
{
receiveLoop(init_sem);
}
};
t.setDaemon(true);
t.start();
final TimerEventPeriodic[] f_ev = {null};
TimerEventPeriodic ev =
SimpleTimer.addPeriodicEvent(
"PRUDP:timeouts",
5000,
new TimerEventPerformer()
{
public void
perform(
TimerEvent event )
{
if ( destroyed && f_ev[0] != null ){
f_ev[0].cancel();
}
checkTimeouts();
}
});
f_ev[0] = ev;
init_sem.reserve();
}
public void
setPrimordialHandler(
PRUDPPrimordialHandler handler )
{
if ( primordial_handler != null && handler != null ){
Debug.out( "Primordial handler replaced!" );
}
primordial_handler = handler;
}
public void
setRequestHandler(
PRUDPRequestHandler _request_handler )
{
if ( request_handler != null ){
if ( _request_handler != null ){
// if we need to support this then the handler will have to be associated
// with a message type map, or we chain together and give each handler
// a bite at processing the message
throw( new RuntimeException( "Multiple handlers per endpoint not supported" ));
}
}
request_handler = _request_handler;
}
public PRUDPRequestHandler
getRequestHandler()
{
return( request_handler );
}
public int
getPort()
{
return( port );
}
protected void
setDefaultBindAddress(
InetAddress address )
{
try{
bind_address_mon.enter();
default_bind_ip = address;
calcBind();
}finally{
bind_address_mon.exit();
}
}
public void
setExplicitBindAddress(
InetAddress address )
{
try{
bind_address_mon.enter();
explicit_bind_ip = address;
calcBind();
}finally{
bind_address_mon.exit();
}
int loops = 0;
while( current_bind_ip != target_bind_ip && !(failed || destroyed)){
if ( loops >= 100 ){
Debug.out( "Giving up on wait for bind ip change to take effect" );
break;
}
try{
Thread.sleep(50);
loops++;
}catch( Throwable e ){
break;
}
}
}
protected void
calcBind()
{
if ( explicit_bind_ip != null ){
target_bind_ip = explicit_bind_ip;
}else{
target_bind_ip = default_bind_ip;
}
}
protected void
receiveLoop(
AESemaphore init_sem )
{
NetworkAdminPropertyChangeListener prop_listener =
new NetworkAdminPropertyChangeListener()
{
public void
propertyChanged(
String property )
{
if ( property == NetworkAdmin.PR_DEFAULT_BIND_ADDRESS ){
setDefaultBindAddress( NetworkAdmin.getSingleton().getDefaultBindAddress());
}
}
};
NetworkAdmin.getSingleton().addPropertyChangeListener( prop_listener );
try{
// outter loop picks up bind-ip changes
while( !( failed || destroyed )){
if ( socket != null ){
try{
socket.close();
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
InetSocketAddress address;
DatagramSocket new_socket;
if ( target_bind_ip == null ){
address = new InetSocketAddress("127.0.0.1",port);
new_socket = new DatagramSocket( port );
}else{
address = new InetSocketAddress( target_bind_ip, port );
new_socket = new DatagramSocket( address );
}
new_socket.setReuseAddress(true);
// short timeout on receive so that we can interrupt a receive fairly quickly
new_socket.setSoTimeout( 1000 );
// only make the socket public once fully configured
socket = new_socket;
current_bind_ip = target_bind_ip;
init_sem.release();
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID,
"PRUDPPacketReceiver: receiver established on port " + port + (current_bind_ip==null?"":(", bound to " + current_bind_ip ))));
byte[] buffer = null;
long successful_accepts = 0;
long failed_accepts = 0;
while( !( failed || destroyed )){
if ( current_bind_ip != target_bind_ip ){
break;
}
try{
if ( buffer == null ){
buffer = new byte[PRUDPPacket.MAX_PACKET_SIZE];
}
DatagramPacket packet = new DatagramPacket( buffer, buffer.length, address );
socket.receive( packet );
long receive_time = SystemTime.getCurrentTime();
successful_accepts++;
failed_accepts = 0;
PRUDPPrimordialHandler prim_hand = primordial_handler;
if ( prim_hand != null ){
if ( prim_hand.packetReceived( packet )){
// primordial handlers get their own buffer as we can't guarantee
// that they don't need to hang onto the data
buffer = null;
stats.primordialPacketReceived( packet.getLength());
}
}
if ( buffer != null ){
process( packet, receive_time );
}
}catch( SocketTimeoutException e ){
}catch( Throwable e ){
failed_accepts++;
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID,
"PRUDPPacketReceiver: receive failed on port " + port, e));
if (( failed_accepts > 100 && successful_accepts == 0 ) || failed_accepts > 1000 ){
Logger.logTextResource(new LogAlert(LogAlert.UNREPEATABLE,
LogAlert.AT_ERROR, "Network.alert.acceptfail"), new String[] {
"" + port, "UDP" });
// break, sometimes get a screaming loop. e.g.
/*
[2:01:55] DEBUG::Tue Dec 07 02:01:55 EST 2004
[2:01:55] java.net.SocketException: Socket operation on nonsocket: timeout in datagram socket peek
[2:01:55] at java.net.PlainDatagramSocketImpl.peekData(Native Method)
[2:01:55] at java.net.DatagramSocket.receive(Unknown Source)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -