📄 dhttransportudpimpl.java
字号:
// don't need to synchronize access to the bloom filter as it works fine
// without protection (especially as its add only)
byte[] addr = contact.getTransportAddress().getAddress().getAddress();
if ( bad_ip_bloom_filter == null ){
bad_ip_bloom_filter = BloomFilterFactory.createAddOnly( BAD_IP_BLOOM_FILTER_SIZE );
}else{
if ( bad_ip_bloom_filter.contains( addr )){
throw( new DHTUDPPacketHandlerException( "IPFilter check fails (repeat)" ));
}
}
if ( ip_filter.isInRange(
contact.getTransportAddress().getAddress(), "DHT",
logger.isEnabled( DHTLogger.LT_IP_FILTER ))){
// don't let an attacker deliberately fill up our filter so we start
// rejecting valid addresses
if ( bad_ip_bloom_filter.getEntryCount() >= BAD_IP_BLOOM_FILTER_SIZE/10 ){
bad_ip_bloom_filter = BloomFilterFactory.createAddOnly( BAD_IP_BLOOM_FILTER_SIZE );
}
bad_ip_bloom_filter.add( addr );
throw( new DHTUDPPacketHandlerException( "IPFilter check fails" ));
}
}
}
protected void
sendPing(
final DHTTransportUDPContactImpl contact,
final DHTTransportReplyHandler handler,
long timeout,
int priority )
{
try{
checkAddress( contact );
final long connection_id = getConnectionID();
final DHTUDPPacketRequestPing request =
new DHTUDPPacketRequestPing( this, connection_id, local_contact, contact );
stats.pingSent( request );
requestSendRequestProcessor( contact, request );
packet_handler.sendAndReceive(
request,
contact.getTransportAddress(),
new DHTUDPPacketReceiver()
{
public void
packetReceived(
DHTUDPPacketReply packet,
InetSocketAddress from_address,
long elapsed_time )
{
try{
if ( packet.getConnectionId() != connection_id ){
throw( new Exception( "connection id mismatch" ));
}
contact.setInstanceIDAndVersion( packet.getTargetInstanceID(), packet.getProtocolVersion());
requestSendReplyProcessor( contact, handler, packet, elapsed_time );
stats.pingOK();
handler.pingReply( contact, (int)elapsed_time );
}catch( DHTUDPPacketHandlerException e ){
error( e );
}catch( Throwable e ){
Debug.printStackTrace(e);
error( new DHTUDPPacketHandlerException( "ping failed", e ));
}
}
public void
error(
DHTUDPPacketHandlerException e )
{
stats.pingFailed();
handler.failed( contact,e );
}
},
timeout, priority );
}catch( Throwable e ){
stats.pingFailed();
handler.failed( contact,e );
}
}
protected void
sendPing(
final DHTTransportUDPContactImpl contact,
final DHTTransportReplyHandler handler )
{
sendPing( contact, handler, request_timeout, PRUDPPacketHandler.PRIORITY_MEDIUM );
}
protected void
sendImmediatePing(
final DHTTransportUDPContactImpl contact,
final DHTTransportReplyHandler handler,
long timeout )
{
sendPing( contact, handler, timeout, PRUDPPacketHandler.PRIORITY_IMMEDIATE );
}
protected void
sendKeyBlockRequest(
final DHTTransportUDPContactImpl contact,
final DHTTransportReplyHandler handler,
byte[] block_request,
byte[] block_signature )
{
try{
checkAddress( contact );
final long connection_id = getConnectionID();
final DHTUDPPacketRequestKeyBlock request =
new DHTUDPPacketRequestKeyBlock( this, connection_id, local_contact, contact );
request.setKeyBlockDetails( block_request, block_signature );
stats.keyBlockSent( request );
request.setRandomID( contact.getRandomID());
requestSendRequestProcessor( contact, request );
packet_handler.sendAndReceive(
request,
contact.getTransportAddress(),
new DHTUDPPacketReceiver()
{
public void
packetReceived(
DHTUDPPacketReply packet,
InetSocketAddress from_address,
long elapsed_time )
{
try{
if ( packet.getConnectionId() != connection_id ){
throw( new Exception( "connection id mismatch" ));
}
contact.setInstanceIDAndVersion( packet.getTargetInstanceID(), packet.getProtocolVersion());
requestSendReplyProcessor( contact, handler, packet, elapsed_time );
stats.keyBlockOK();
handler.keyBlockReply( contact );
}catch( DHTUDPPacketHandlerException e ){
error( e );
}catch( Throwable e ){
Debug.printStackTrace(e);
error( new DHTUDPPacketHandlerException( "send key block failed", e ));
}
}
public void
error(
DHTUDPPacketHandlerException e )
{
stats.keyBlockFailed();
handler.failed( contact,e );
}
},
request_timeout, PRUDPPacketHandler.PRIORITY_MEDIUM );
}catch( Throwable e ){
stats.keyBlockFailed();
handler.failed( contact,e );
}
}
// stats
protected void
sendStats(
final DHTTransportUDPContactImpl contact,
final DHTTransportReplyHandler handler )
{
try{
checkAddress( contact );
final long connection_id = getConnectionID();
final DHTUDPPacketRequestStats request =
new DHTUDPPacketRequestStats( this, connection_id, local_contact, contact );
// request.setStatsType( DHTUDPPacketRequestStats.STATS_TYPE_NP_VER2 );
stats.statsSent( request );
requestSendRequestProcessor( contact, request );
packet_handler.sendAndReceive(
request,
contact.getTransportAddress(),
new DHTUDPPacketReceiver()
{
public void
packetReceived(
DHTUDPPacketReply packet,
InetSocketAddress from_address,
long elapsed_time )
{
try{
if ( packet.getConnectionId() != connection_id ){
throw( new Exception( "connection id mismatch" ));
}
contact.setInstanceIDAndVersion( packet.getTargetInstanceID(), packet.getProtocolVersion());
requestSendReplyProcessor( contact, handler, packet, elapsed_time );
DHTUDPPacketReplyStats reply = (DHTUDPPacketReplyStats)packet;
stats.statsOK();
if ( reply.getStatsType() == DHTUDPPacketRequestStats.STATS_TYPE_ORIGINAL ){
handler.statsReply( contact, reply.getOriginalStats());
}else{
// currently no handler for new stats
System.out.println( "new stats reply:" + reply.getString());
}
}catch( DHTUDPPacketHandlerException e ){
error( e );
}catch( Throwable e ){
Debug.printStackTrace(e);
error( new DHTUDPPacketHandlerException( "stats failed", e ));
}
}
public void
error(
DHTUDPPacketHandlerException e )
{
stats.statsFailed();
handler.failed( contact, e );
}
},
request_timeout, PRUDPPacketHandler.PRIORITY_LOW );
}catch( Throwable e ){
stats.statsFailed();
handler.failed( contact, e );
}
}
// PING for deducing external IP address
protected InetSocketAddress
askContactForExternalAddress(
DHTTransportUDPContactImpl contact )
{
try{
checkAddress( contact );
final long connection_id = getConnectionID();
final DHTUDPPacketRequestPing request =
new DHTUDPPacketRequestPing( this, connection_id, local_contact, contact );
stats.pingSent( request );
final AESemaphore sem = new AESemaphore( "DHTTransUDP:extping" );
final InetSocketAddress[] result = new InetSocketAddress[1];
packet_handler.sendAndReceive(
request,
contact.getTransportAddress(),
new DHTUDPPacketReceiver()
{
public void
packetReceived(
DHTUDPPacketReply _packet,
InetSocketAddress from_address,
long elapsed_time )
{
try{
if ( _packet instanceof DHTUDPPacketReplyPing ){
// ping was OK so current address is OK
result[0] = local_contact.getExternalAddress();
}else if ( _packet instanceof DHTUDPPacketReplyError ){
DHTUDPPacketReplyError packet = (DHTUDPPacketReplyError)_packet;
if ( packet.getErrorType() == DHTUDPPacketReplyError.ET_ORIGINATOR_ADDRESS_WRONG ){
result[0] = packet.getOriginatingAddress();
}
}
}finally{
sem.release();
}
}
public void
error(
DHTUDPPacketHandlerException e )
{
try{
stats.pingFailed();
}finally{
sem.release();
}
}
},
5000, PRUDPPacketHandler.PRIORITY_HIGH );
sem.reserve( 5000 );
return( result[0] );
}catch( Throwable e ){
stats.pingFailed();
return( null );
}
}
// STORE
public void
sendStore(
final DHTTransportUDPContactImpl contact,
final DHTTransportReplyHandler handler,
byte[][] keys,
DHTTransportValue[][] value_sets )
{
final long connection_id = getConnectionID();
if ( false ){
int total_values = 0;
for (int i=0;i<keys.length;i++){
total_values += value_sets[i].length;
}
System.out.println( "store: keys = " + keys.length +", values = " + total_values );
}
// only report to caller the outcome of the first packet
int packet_count = 0;
try{
checkAddress( contact );
int current_key_index = 0;
int current_value_index = 0;
while( current_key_index < keys.length ){
packet_count++;
int space = DHTUDPPacketHelper.PACKET_MAX_BYTES - DHTUDPPacketRequest.DHT_HEADER_SIZE;
List key_list = new ArrayList();
List values_list = new ArrayList();
key_list.add( keys[current_key_index]);
space -= ( keys[current_key_index].length + 1 ); // 1 for length marker
values_list.add( new ArrayList());
while( space > 0 &&
current_key_index < keys.length ){
if ( current_value_index == value_sets[current_key_index].length ){
// all values from the current key have been processed
current_key_index++;
current_value_index = 0;
if ( key_list.size() == DHTUDPPacketRequestStore.MAX_KEYS_PER_PACKET ){
// no more keys allowed in this packet
break;
}
if ( current_key_index == keys.length ){
// no more keys left, job done
break;
}
key_list.add( keys[current_key_index]);
space -= ( keys[current_key_index].length + 1 ); // 1 for length marker
values_list.add( new ArrayList());
}
DHTTransportValue value = value_sets[current_key_index][current_value_index];
int entry_size = DHTUDPUtils.DHTTRANSPORTVALUE_SIZE_WITHOUT_VALUE + value.getValue().length + 1;
List values = (List)values_list.get(values_list.size()-1);
if ( space < entry_size ||
values.size() == DHTUDPPacketRequestStore.MAX_VALUES_PER_KEY ){
// no space left or we've used up our limit on the
// number of values permitted per key
break;
}
values.add( value );
space -= entry_size;
current_value_index++;
}
int packet_entries = key_list.size();
if ( packet_entries > 0 ){
// if last entry has no values then ignore it
if ( ((List)values_list.get( packet_entries-1)).size() == 0 ){
packet_entries--;
}
}
if ( packet_entries == 0 ){
break;
}
byte[][] packet_keys = new byte[packet_entries][];
DHTTransportValue[][] packet_value_sets = new DHTTransportValue[packet_entries][];
//int packet_value_count = 0;
for (int i=0;i<packet_entries;i++){
packet_keys[i] = (byte[])key_list.get(i);
List values = (List)values_list.get(i);
packet_value_sets[i] = new DHTTransportValue[values.size()];
for (int j=0;j<values.size();j++){
packet_value_sets[i][j] = (DHTTransportValue)values.get(j);
//packet_value_count++;
}
}
// System.out.println( " packet " + packet_count + ": keys = " + packet_entries + ", values = " + packet_value_count );
final DHTUDPPacketRequestStore request =
new DHTUDPPacketRequestStore( this, connection_id, local_contact, contact );
stats.storeSent( request );
request.setRandomID( contact.getRandomID());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -