📄 genericmessageconnectionindirect.java
字号:
// Decompiled by Jad v1.5.8e2. Copyright 2001 Pavel Kouznetsov.
// Jad home page: http://kpdus.tripod.com/jad.html
// Decompiler options: packimports(3) fieldsfirst ansi space
// Source File Name: GenericMessageConnectionIndirect.java
package org.gudy.azureus2.pluginsimpl.local.messaging;
import com.aelitis.azureus.core.nat.NATTraverser;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.*;
import org.gudy.azureus2.core3.logging.*;
import org.gudy.azureus2.core3.util.*;
import org.gudy.azureus2.plugins.messaging.MessageException;
import org.gudy.azureus2.plugins.messaging.generic.GenericMessageEndpoint;
import org.gudy.azureus2.plugins.messaging.generic.GenericMessageHandler;
import org.gudy.azureus2.plugins.network.RateLimiter;
import org.gudy.azureus2.plugins.utils.PooledByteBuffer;
// Referenced classes of package org.gudy.azureus2.pluginsimpl.local.messaging:
// GenericMessage, GenericMessageConnectionAdapter, GenericMessageConnectionImpl, GenericMessageEndpointImpl,
// MessageManagerImpl
public class GenericMessageConnectionIndirect
implements GenericMessageConnectionAdapter
{
private static final LogIDs LOGID;
private static final boolean TRACE = false;
public static final int MAX_MESSAGE_SIZE = 32768;
private static final int MESSAGE_TYPE_CONNECT = 1;
private static final int MESSAGE_TYPE_ERROR = 2;
private static final int MESSAGE_TYPE_DATA = 3;
private static final int MESSAGE_TYPE_DISCONNECT = 4;
private static final int TICK_PERIOD = 5000;
private static final int KEEP_ALIVE_CHECK_PERIOD = 5000;
private static final int KEEP_ALIVE_MIN = 10000;
private static final int STATS_PERIOD = 60000;
private static final int KEEP_ALIVE_CHECK_TICKS = 1;
private static final int STATS_TICKS = 12;
private static final int MAX_REMOTE_CONNECTIONS = 1024;
private static final int MAX_REMOTE_CONNECTIONS_PER_IP = 32;
private static long connection_id_next = (new Random()).nextLong();
private static Map local_connections = new HashMap();
private static Map remote_connections = new HashMap();
private static ThreadPool keep_alive_pool = new ThreadPool("GenericMessageConnectionIndirect:keepAlive", 8, true);
private MessageManagerImpl message_manager;
private String msg_id;
private String msg_desc;
private GenericMessageEndpoint endpoint;
private NATTraverser nat_traverser;
private GenericMessageConnectionImpl owner;
private InetSocketAddress rendezvous;
private InetSocketAddress target;
private long connection_id;
private boolean incoming;
private boolean closed;
private LinkedList send_queue;
private AESemaphore send_queue_sem;
private volatile long last_message_sent;
private volatile long last_message_received;
private volatile boolean keep_alive_in_progress;
protected static Map receive(MessageManagerImpl message_manager, InetSocketAddress originator, Map message)
{
int type;
String msg_id;
String msg_desc;
GenericMessageEndpointImpl endpoint;
GenericMessageHandler handler;
if (!message.containsKey("type"))
return null;
type = ((Long)message.get("type")).intValue();
if (type != 1)
break MISSING_BLOCK_LABEL_522;
msg_id = new String((byte[])(byte[])message.get("msg_id"));
msg_desc = new String((byte[])(byte[])message.get("msg_desc"));
endpoint = new GenericMessageEndpointImpl(originator);
endpoint.addUDP(originator);
handler = message_manager.getHandler(msg_id);
if (handler == null)
{
Debug.out((new StringBuilder()).append("No message handler registered for '").append(msg_id).append("'").toString());
return null;
}
Map map2 = remote_connections;
JVM INSTR monitorenter ;
if (remote_connections.size() < 1024)
break MISSING_BLOCK_LABEL_202;
Debug.out((new StringBuilder()).append("Maximum remote connections exceeded - request from ").append(originator).append(" denied [").append(getRemoteConnectionStatus()).append("]").toString());
return null;
int num_from_this_ip;
num_from_this_ip = 0;
Iterator it = remote_connections.values().iterator();
do
{
if (!it.hasNext())
break;
GenericMessageConnectionIndirect con = (GenericMessageConnectionIndirect)it.next();
if (con.getEndpoint().getNotionalAddress().getAddress().equals(originator.getAddress()))
num_from_this_ip++;
} while (true);
if (num_from_this_ip < 32) goto _L2; else goto _L1
_L1:
Debug.out((new StringBuilder()).append("Maximum remote connections per-ip exceeded - request from ").append(originator).append(" denied [").append(getRemoteConnectionStatus()).append("]").toString());
null;
map2;
JVM INSTR monitorexit ;
return;
_L2:
Long con_id = new Long(connection_id_next++);
map2;
JVM INSTR monitorexit ;
goto _L3
Exception exception;
exception;
throw exception;
_L3:
Map reply;
GenericMessageConnectionIndirect indirect_connection = new GenericMessageConnectionIndirect(message_manager, msg_id, msg_desc, endpoint, con_id.longValue());
GenericMessageConnectionImpl new_connection = new GenericMessageConnectionImpl(message_manager, indirect_connection);
if (!handler.accept(new_connection))
break MISSING_BLOCK_LABEL_509;
new_connection.accepted();
synchronized (remote_connections)
{
remote_connections.put(con_id, indirect_connection);
}
List replies = indirect_connection.receive((List)message.get("data"));
reply = new HashMap();
reply.put("type", new Long(1L));
reply.put("con_id", con_id);
reply.put("data", replies);
return reply;
return null;
MessageException e;
e;
Debug.out("Error accepting message", e);
return null;
Long con_id;
GenericMessageConnectionIndirect indirect_connection;
if (type == 3)
{
con_id = (Long)message.get("con_id");
synchronized (remote_connections)
{
indirect_connection = (GenericMessageConnectionIndirect)remote_connections.get(con_id);
}
if (indirect_connection == null)
return null;
Map reply = new HashMap();
if (indirect_connection.isClosed())
{
reply.put("type", new Long(4L));
} else
{
List replies = indirect_connection.receive((List)message.get("data"));
reply.put("type", new Long(3L));
reply.put("data", replies);
if (indirect_connection.receiveIncomplete())
reply.put("more_data", new Long(1L));
}
return reply;
}
con_id = (Long)message.get("con_id");
synchronized (remote_connections)
{
indirect_connection = (GenericMessageConnectionIndirect)remote_connections.get(con_id);
}
if (indirect_connection != null)
try
{
indirect_connection.close(new Throwable("Remote closed connection"));
}
catch (Throwable e)
{
Debug.printStackTrace(e);
}
return null;
}
protected static String getRemoteConnectionStatus()
{
return getConnectionStatus(remote_connections);
}
protected static String getLocalConnectionStatus()
{
return getConnectionStatus(local_connections);
}
protected static String getConnectionStatus(Map connections)
{
Map totals = new HashMap();
synchronized (connections)
{
InetAddress originator;
Integer i;
for (Iterator it = connections.values().iterator(); it.hasNext(); totals.put(originator, i))
{
GenericMessageConnectionIndirect con = (GenericMessageConnectionIndirect)it.next();
originator = con.getEndpoint().getNotionalAddress().getAddress();
i = (Integer)totals.get(originator);
if (i == null)
i = new Integer(1);
else
i = new Integer(i.intValue() + 1);
}
}
String str = "";
for (Iterator it = totals.entrySet().iterator(); it.hasNext();)
{
java.util.Map.Entry entry = (java.util.Map.Entry)it.next();
str = (new StringBuilder()).append(str).append(str.length() != 0 ? "," : "").append(entry.getKey()).append(":").append(entry.getValue()).toString();
}
return str;
}
protected GenericMessageConnectionIndirect(MessageManagerImpl _message_manager, String _msg_id, String _msg_desc, GenericMessageEndpoint _endpoint, InetSocketAddress _rendezvous, InetSocketAddress _target)
{
send_queue = new LinkedList();
send_queue_sem = new AESemaphore("GenericMessageConnectionIndirect:sendq");
message_manager = _message_manager;
msg_id = _msg_id;
msg_desc = _msg_desc;
endpoint = _endpoint;
rendezvous = _rendezvous;
target = _target;
nat_traverser = message_manager.getNATTraverser();
log((new StringBuilder()).append("outgoing connection to ").append(endpoint.getNotionalAddress()).toString());
}
protected GenericMessageConnectionIndirect(MessageManagerImpl _message_manager, String _msg_id, String _msg_desc, GenericMessageEndpoint _endpoint, long _connection_id)
{
send_queue = new LinkedList();
send_queue_sem = new AESemaphore("GenericMessageConnectionIndirect:sendq");
message_manager = _message_manager;
msg_id = _msg_id;
msg_desc = _msg_desc;
endpoint = _endpoint;
connection_id = _connection_id;
incoming = true;
last_message_received = SystemTime.getCurrentTime();
log((new StringBuilder()).append("incoming connection from ").append(endpoint.getNotionalAddress()).toString());
}
public void setOwner(GenericMessageConnectionImpl _owner)
{
owner = _owner;
}
public int getMaximumMessageSize()
{
return 32768;
}
public String getType()
{
return "Tunnel";
}
public int getTransportType()
{
return 0;
}
public long getLastMessageReceivedTime()
{
long now = SystemTime.getCurrentTime();
if (now < last_message_received)
last_message_received = now;
return last_message_received;
}
public GenericMessageEndpoint getEndpoint()
{
return endpoint;
}
public void addInboundRateLimiter(RateLimiter ratelimiter)
{
}
public void removeInboundRateLimiter(RateLimiter ratelimiter)
{
}
public void addOutboundRateLimiter(RateLimiter ratelimiter)
{
}
public void removeOutboundRateLimiter(RateLimiter ratelimiter)
{
}
public void connect(ByteBuffer initial_data, GenericMessageConnectionAdapter.ConnectionListener listener)
{
try
{
Map message = new HashMap();
byte initial_data_bytes[] = new byte[initial_data.remaining()];
initial_data.get(initial_data_bytes);
List initial_messages = new ArrayList();
initial_messages.add(initial_data_bytes);
message.put("type", new Long(1L));
message.put("msg_id", msg_id);
message.put("msg_desc", msg_desc);
message.put("data", initial_messages);
Map reply = nat_traverser.sendMessage(message_manager, rendezvous, target, message);
last_message_sent = SystemTime.getCurrentTime();
if (reply == null || !reply.containsKey("type"))
{
listener.connectFailure(new Throwable((new StringBuilder()).append("Indirect connect failed (response=").append(reply).append(")").toString()));
} else
{
int reply_type = ((Long)reply.get("type")).intValue();
if (reply_type == 2)
listener.connectFailure(new Throwable(new String((byte[])(byte[])reply.get("error"))));
else
if (reply_type == 4)
listener.connectFailure(new Throwable("Disconnected"));
else
if (reply_type == 1)
{
connection_id = ((Long)reply.get("con_id")).longValue();
synchronized (local_connections)
{
local_connections.put(new Long(connection_id), this);
}
listener.connectSuccess();
List replies = (List)reply.get("data");
for (int i = 0; i < replies.size(); i++)
owner.receive(new GenericMessage(msg_id, msg_desc, new DirectByteBuffer(ByteBuffer.wrap((byte[])replies.get(i))), false));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -