📄 prudppackethandlerimpl.java
字号:
[2:01:55] at org.gudy.azureus2.core3.tracker.server.impl.udp.TRTrackerServerUDP.recvLoop(TRTrackerServerUDP.java:118)
[2:01:55] at org.gudy.azureus2.core3.tracker.server.impl.udp.TRTrackerServerUDP$1.runSupport(TRTrackerServerUDP.java:90)
[2:01:55] at org.gudy.azureus2.core3.util.AEThread.run(AEThread.java:45)
*/
init_error = e;
failed = true;
}
}
}
}
}catch( Throwable e ){
init_error = e;
Logger.logTextResource(new LogAlert(LogAlert.UNREPEATABLE,
LogAlert.AT_ERROR, "Tracker.alert.listenfail"), new String[] { "UDP:"
+ port });
Logger.log(new LogEvent(LOGID, "PRUDPPacketReceiver: "
+ "DatagramSocket bind failed on port " + port, e));
}finally{
init_sem.release();
destroy_sem.releaseForever();
if ( socket != null ){
try{
socket.close();
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
NetworkAdmin.getSingleton().removePropertyChangeListener( prop_listener );
}
}
protected void
checkTimeouts()
{
long now = SystemTime.getCurrentTime();
List timed_out = new ArrayList();
try{
requests_mon.enter();
Iterator it = requests.values().iterator();
while( it.hasNext()){
PRUDPPacketHandlerRequestImpl request = (PRUDPPacketHandlerRequestImpl)it.next();
long sent_time = request.getSendTime();
if ( sent_time != 0 &&
now - sent_time >= request.getTimeout()){
it.remove();
stats.requestTimedOut();
timed_out.add( request );
}
}
}finally{
requests_mon.exit();
}
for (int i=0;i<timed_out.size();i++){
PRUDPPacketHandlerRequestImpl request = (PRUDPPacketHandlerRequestImpl)timed_out.get(i);
if ( TRACE_REQUESTS ){
if (Logger.isEnabled())
Logger.log(new LogEvent(LOGID, LogEvent.LT_ERROR,
"PRUDPPacketHandler: request timeout"));
}
// don't change the text of this message, it's used elsewhere
try{
request.setException(new PRUDPPacketHandlerException("timed out"));
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
protected void
process(
DatagramPacket dg_packet,
long receive_time )
{
try{
// HACK alert. Due to the form of the tracker UDP protocol (no common
// header for requests and replies) we enforce a rule. All connection ids
// must have their MSB set. As requests always start with the action, which
// always has the MSB clear, we can use this to differentiate.
byte[] packet_data = dg_packet.getData();
int packet_len = dg_packet.getLength();
// System.out.println( "received:" + packet_len );
PRUDPPacket packet;
boolean request_packet;
stats.packetReceived(packet_len);
if ( ( packet_data[0]&0x80 ) == 0 ){
request_packet = false;
packet = PRUDPPacketReply.deserialiseReply(
this,
new DataInputStream(new ByteArrayInputStream( packet_data, 0, packet_len)));
}else{
request_packet = true;
packet = PRUDPPacketRequest.deserialiseRequest(
this,
new DataInputStream(new ByteArrayInputStream( packet_data, 0, packet_len)));
}
packet.setSerialisedSize( packet_len );
packet.setAddress( (InetSocketAddress)dg_packet.getSocketAddress());
if ( request_packet ){
total_requests_received++;
// System.out.println( "Incoming from " + dg_packet.getAddress());
if ( TRACE_REQUESTS ){
Logger.log(new LogEvent(LOGID,
"PRUDPPacketHandler: request packet received: "
+ packet.getString()));
}
if ( receive_delay > 0 ){
// we take the processing offline so that these incoming requests don't
// interfere with replies to outgoing requests
try{
recv_queue_mon.enter();
if ( recv_queue_data_size > MAX_RECV_QUEUE_DATA_SIZE ){
long now = SystemTime.getCurrentTime();
if ( now - last_error_report > 30000 ){
last_error_report = now;
Debug.out( "Receive queue size limit exceeded (" +
MAX_RECV_QUEUE_DATA_SIZE + "), dropping request packet [" +
total_requests_received + "/" + total_requests_processed + ":" + total_replies + "]");
}
}else if ( receive_delay * recv_queue.size() > queued_request_timeout ){
// by the time this request gets processed it'll have timed out
// in the caller anyway, so discard it
long now = SystemTime.getCurrentTime();
if ( now - last_error_report > 30000 ){
last_error_report = now;
Debug.out( "Receive queue entry limit exceeded (" +
recv_queue.size() + "), dropping request packet ]" +
total_requests_received + "/" + total_requests_processed + ":" + total_replies + "]");
}
}else{
recv_queue.add( new Object[]{ packet, new Integer( dg_packet.getLength()) });
recv_queue_data_size += dg_packet.getLength();
recv_queue_sem.release();
if ( recv_thread == null ){
recv_thread =
new AEThread( "PRUDPPacketHandler:receiver" )
{
public void
runSupport()
{
while( true ){
try{
recv_queue_sem.reserve();
Object[] data;
try{
recv_queue_mon.enter();
data = (Object[])recv_queue.remove(0);
total_requests_processed++;
}finally{
recv_queue_mon.exit();
}
PRUDPPacketRequest p = (PRUDPPacketRequest)data[0];
recv_queue_data_size -= ((Integer)data[1]).intValue();
PRUDPRequestHandler handler = request_handler;
if ( handler != null ){
handler.process( p );
Thread.sleep( receive_delay );
}
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
}
};
recv_thread.setDaemon( true );
recv_thread.start();
}
}
}finally{
recv_queue_mon.exit();
}
}else{
PRUDPRequestHandler handler = request_handler;
if ( handler != null ){
handler.process( (PRUDPPacketRequest)packet );
}
}
}else{
total_replies++;
if ( TRACE_REQUESTS ){
Logger.log(new LogEvent(LOGID,
"PRUDPPacketHandler: reply packet received: "
+ packet.getString()));
}
PRUDPPacketHandlerRequestImpl request;
try{
requests_mon.enter();
if ( packet.hasContinuation()){
// don't remove the request if there are more replies to come
request = (PRUDPPacketHandlerRequestImpl)requests.get(new Integer(packet.getTransactionId()));
}else{
request = (PRUDPPacketHandlerRequestImpl)requests.remove(new Integer(packet.getTransactionId()));
}
}finally{
requests_mon.exit();
}
if ( request == null ){
if ( TRACE_REQUESTS ){
Logger.log(new LogEvent(LOGID, LogEvent.LT_ERROR,
"PRUDPPacketReceiver: unmatched reply received, discarding:"
+ packet.getString()));
}
}else{
request.setReply( packet, (InetSocketAddress)dg_packet.getSocketAddress(), receive_time );
}
}
}catch( Throwable e ){
// if someone's sending us junk we just log and continue
if ( e instanceof IOException ){
// generally uninteresting
}else{
Logger.log(new LogEvent(LOGID, "", e));
}
}
}
public PRUDPPacket
sendAndReceive(
PRUDPPacket request_packet,
InetSocketAddress destination_address )
throws PRUDPPacketHandlerException
{
return( sendAndReceive( null,request_packet, destination_address ));
}
public PRUDPPacket
sendAndReceive(
PasswordAuthentication auth,
PRUDPPacket request_packet,
InetSocketAddress destination_address )
throws PRUDPPacketHandlerException
{
return( sendAndReceive( auth, request_packet, destination_address, PRUDPPacket.DEFAULT_UDP_TIMEOUT ));
}
public PRUDPPacket
sendAndReceive(
PasswordAuthentication auth,
PRUDPPacket request_packet,
InetSocketAddress destination_address,
long timeout )
throws PRUDPPacketHandlerException
{
PRUDPPacketHandlerRequestImpl request =
sendAndReceive( auth, request_packet, destination_address, null, timeout, PRUDPPacketHandler.PRIORITY_MEDIUM );
return( request.getReply());
}
public void
sendAndReceive(
PRUDPPacket request_packet,
InetSocketAddress destination_address,
PRUDPPacketReceiver receiver,
long timeout,
int priority )
throws PRUDPPacketHandlerException
{
sendAndReceive( null, request_packet, destination_address, receiver, timeout, priority );
}
public PRUDPPacketHandlerRequestImpl
sendAndReceive(
PasswordAuthentication auth,
PRUDPPacket request_packet,
InetSocketAddress destination_address,
PRUDPPacketReceiver receiver,
long timeout,
int priority )
throws PRUDPPacketHandlerException
{
if ( socket == null ){
if ( init_error != null ){
throw( new PRUDPPacketHandlerException( "Transport unavailable", init_error ));
}
throw( new PRUDPPacketHandlerException( "Transport unavailable" ));
}
try{
checkTargetAddress( destination_address );
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream( baos );
request_packet.serialise(os);
byte[] buffer = baos.toByteArray();
request_packet.setSerialisedSize( buffer.length );
if ( auth != null ){
//<parg_home> so <new_packet> = <old_packet> + <user_padded_to_8_bytes> + <hash>
//<parg_home> where <hash> = first 8 bytes of sha1(<old_packet> + <user_padded_to_8> + sha1(pass))
//<XTF> Yes
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -