📄 dhttransportudpimpl.java
字号:
bad_ip_bloom_filter.add( addr );
throw( new DHTUDPPacketHandlerException( "IPFilter check fails" ));
}
}
}
protected void
sendPing(
final DHTTransportUDPContactImpl contact,
final DHTTransportReplyHandler handler )
{
try{
checkAddress( contact );
final long connection_id = getConnectionID();
final DHTUDPPacketRequestPing request =
new DHTUDPPacketRequestPing( this, connection_id, local_contact, contact );
stats.pingSent( request );
requestSendRequestProcessor( contact, request );
packet_handler.sendAndReceive(
request,
contact.getTransportAddress(),
new DHTUDPPacketReceiver()
{
public void
packetReceived(
DHTUDPPacketReply packet,
InetSocketAddress from_address,
long elapsed_time )
{
try{
if ( packet.getConnectionId() != connection_id ){
throw( new Exception( "connection id mismatch" ));
}
contact.setInstanceIDAndVersion( packet.getTargetInstanceID(), packet.getProtocolVersion());
requestSendReplyProcessor( contact, packet, elapsed_time );
stats.pingOK();
handler.pingReply( contact );
}catch( DHTUDPPacketHandlerException e ){
error( e );
}catch( Throwable e ){
Debug.printStackTrace(e);
error( new DHTUDPPacketHandlerException( "ping failed", e ));
}
}
public void
error(
DHTUDPPacketHandlerException e )
{
stats.pingFailed();
handler.failed( contact,e );
}
},
request_timeout, PRUDPPacketHandler.PRIORITY_MEDIUM );
}catch( Throwable e ){
stats.pingFailed();
handler.failed( contact,e );
}
}
// stats
protected void
sendStats(
final DHTTransportUDPContactImpl contact,
final DHTTransportReplyHandler handler )
{
try{
checkAddress( contact );
final long connection_id = getConnectionID();
final DHTUDPPacketRequestStats request =
new DHTUDPPacketRequestStats( this, connection_id, local_contact, contact );
stats.statsSent( request );
requestSendRequestProcessor( contact, request );
packet_handler.sendAndReceive(
request,
contact.getTransportAddress(),
new DHTUDPPacketReceiver()
{
public void
packetReceived(
DHTUDPPacketReply packet,
InetSocketAddress from_address,
long elapsed_time )
{
try{
if ( packet.getConnectionId() != connection_id ){
throw( new Exception( "connection id mismatch" ));
}
contact.setInstanceIDAndVersion( packet.getTargetInstanceID(), packet.getProtocolVersion());
requestSendReplyProcessor( contact, packet, elapsed_time );
DHTUDPPacketReplyStats reply = (DHTUDPPacketReplyStats)packet;
stats.statsOK();
handler.statsReply( contact, reply.getStats());
}catch( DHTUDPPacketHandlerException e ){
error( e );
}catch( Throwable e ){
Debug.printStackTrace(e);
error( new DHTUDPPacketHandlerException( "stats failed", e ));
}
}
public void
error(
DHTUDPPacketHandlerException e )
{
stats.statsFailed();
handler.failed( contact, e );
}
},
request_timeout, PRUDPPacketHandler.PRIORITY_LOW );
}catch( Throwable e ){
stats.statsFailed();
handler.failed( contact, e );
}
}
// PING for deducing external IP address
protected InetSocketAddress
askContactForExternalAddress(
DHTTransportUDPContactImpl contact )
{
try{
checkAddress( contact );
final long connection_id = getConnectionID();
final DHTUDPPacketRequestPing request =
new DHTUDPPacketRequestPing( this, connection_id, local_contact, contact );
stats.pingSent( request );
final AESemaphore sem = new AESemaphore( "DHTTransUDP:extping" );
final InetSocketAddress[] result = new InetSocketAddress[1];
packet_handler.sendAndReceive(
request,
contact.getTransportAddress(),
new DHTUDPPacketReceiver()
{
public void
packetReceived(
DHTUDPPacketReply _packet,
InetSocketAddress from_address,
long elapsed_time )
{
try{
if ( _packet instanceof DHTUDPPacketReplyPing ){
// ping was OK so current address is OK
result[0] = local_contact.getExternalAddress();
}else if ( _packet instanceof DHTUDPPacketReplyError ){
DHTUDPPacketReplyError packet = (DHTUDPPacketReplyError)_packet;
if ( packet.getErrorType() == DHTUDPPacketReplyError.ET_ORIGINATOR_ADDRESS_WRONG ){
result[0] = packet.getOriginatingAddress();
}
}
}finally{
sem.release();
}
}
public void
error(
DHTUDPPacketHandlerException e )
{
try{
stats.pingFailed();
}finally{
sem.release();
}
}
},
5000, PRUDPPacketHandler.PRIORITY_HIGH );
sem.reserve( 5000 );
return( result[0] );
}catch( Throwable e ){
stats.pingFailed();
return( null );
}
}
// STORE
public void
sendStore(
final DHTTransportUDPContactImpl contact,
final DHTTransportReplyHandler handler,
byte[][] keys,
DHTTransportValue[][] value_sets )
{
final long connection_id = getConnectionID();
if ( false ){
int total_values = 0;
for (int i=0;i<keys.length;i++){
total_values += value_sets[i].length;
}
System.out.println( "store: keys = " + keys.length +", values = " + total_values );
}
// only report to caller the outcome of the first packet
int packet_count = 0;
try{
checkAddress( contact );
int current_key_index = 0;
int current_value_index = 0;
while( current_key_index < keys.length ){
packet_count++;
int space = DHTUDPPacketHelper.PACKET_MAX_BYTES - DHTUDPPacketRequest.DHT_HEADER_SIZE;
List key_list = new ArrayList();
List values_list = new ArrayList();
key_list.add( keys[current_key_index]);
space -= ( keys[current_key_index].length + 1 ); // 1 for length marker
values_list.add( new ArrayList());
while( space > 0 &&
current_key_index < keys.length ){
if ( current_value_index == value_sets[current_key_index].length ){
// all values from the current key have been processed
current_key_index++;
current_value_index = 0;
if ( key_list.size() == DHTUDPPacketRequestStore.MAX_KEYS_PER_PACKET ){
// no more keys allowed in this packet
break;
}
if ( current_key_index == keys.length ){
// no more keys left, job done
break;
}
key_list.add( keys[current_key_index]);
space -= ( keys[current_key_index].length + 1 ); // 1 for length marker
values_list.add( new ArrayList());
}
DHTTransportValue value = value_sets[current_key_index][current_value_index];
int entry_size = DHTUDPUtils.DHTTRANSPORTVALUE_SIZE_WITHOUT_VALUE + value.getValue().length + 1;
List values = (List)values_list.get(values_list.size()-1);
if ( space < entry_size ||
values.size() == DHTUDPPacketRequestStore.MAX_VALUES_PER_KEY ){
// no space left or we've used up our limit on the
// number of values permitted per key
break;
}
values.add( value );
space -= entry_size;
current_value_index++;
}
int packet_entries = key_list.size();
if ( packet_entries > 0 ){
// if last entry has no values then ignore it
if ( ((List)values_list.get( packet_entries-1)).size() == 0 ){
packet_entries--;
}
}
if ( packet_entries == 0 ){
break;
}
byte[][] packet_keys = new byte[packet_entries][];
DHTTransportValue[][] packet_value_sets = new DHTTransportValue[packet_entries][];
//int packet_value_count = 0;
for (int i=0;i<packet_entries;i++){
packet_keys[i] = (byte[])key_list.get(i);
List values = (List)values_list.get(i);
packet_value_sets[i] = new DHTTransportValue[values.size()];
for (int j=0;j<values.size();j++){
packet_value_sets[i][j] = (DHTTransportValue)values.get(j);
//packet_value_count++;
}
}
// System.out.println( " packet " + packet_count + ": keys = " + packet_entries + ", values = " + packet_value_count );
final DHTUDPPacketRequestStore request =
new DHTUDPPacketRequestStore( this, connection_id, local_contact, contact );
stats.storeSent( request );
request.setRandomID( contact.getRandomID());
request.setKeys( packet_keys );
request.setValueSets( packet_value_sets );
final int f_packet_count = packet_count;
requestSendRequestProcessor( contact, request );
packet_handler.sendAndReceive(
request,
contact.getTransportAddress(),
new DHTUDPPacketReceiver()
{
public void
packetReceived(
DHTUDPPacketReply packet,
InetSocketAddress from_address,
long elapsed_time )
{
try{
if ( packet.getConnectionId() != connection_id ){
throw( new Exception( "connection id mismatch" ));
}
contact.setInstanceIDAndVersion( packet.getTargetInstanceID(), packet.getProtocolVersion());
requestSendReplyProcessor( contact, packet, elapsed_time );
DHTUDPPacketReplyStore reply = (DHTUDPPacketReplyStore)packet;
stats.storeOK();
if ( f_packet_count == 1 ){
handler.storeReply( contact, reply.getDiversificationTypes());
}
}catch( DHTUDPPacketHandlerException e ){
error( e );
}catch( Throwable e ){
Debug.printStackTrace(e);
error( new DHTUDPPacketHandlerException( "store failed", e ));
}
}
public void
error(
DHTUDPPacketHandlerException e )
{
stats.storeFailed();
if ( f_packet_count == 1 ){
handler.failed( contact, e );
}
}
},
store_timeout,
PRUDPPacketHandler.PRIORITY_LOW );
}
}catch( Throwable e ){
stats.storeFailed();
if ( packet_count <= 1 ){
handler.failed( contact, e );
}
}
}
// FIND NODE
public void
sendFindNode(
final DHTTransportUDPContactImpl contact,
final DHTTransportReplyHandler handler,
byte[] nid )
{
try{
checkAddress( contact );
final long connection_id = getConnectionID();
final DHTUDPPacketRequestFindNode request =
new DHTUDPPacketRequestFindNode( this, connection_id, local_contact, contact );
stats.findNodeSent( request );
request.setID( nid );
requestSendRequestProcessor( contact, request );
packet_handler.sendAndReceive(
request,
contact.getTransportAddress(),
new DHTUDPPacketReceiver()
{
public void
packetReceived(
DHTUDPPacketReply packet,
InetSocketAddress from_address,
long elapsed_time )
{
try{
if ( packet.getConnectionId() != connection_id ){
throw( new Exception( "connection id mismatch" ));
}
contact.setInstanceIDAndVersion( packet.getTargetInstanceID(), packet.getProtocolVersion());
requestSendReplyProcessor( contact, packet, elapsed_time );
DHTUDPPacketReplyFindNode reply = (DHTUDPPacketReplyFindNode)packet;
// copy out the random id in preparation for a possible subsequent
// store operation
contact.setRandomID( reply.getRandomID());
updateContactStatus( contact, reply.getNodeStatus());
request_handler.setTransportEstimatedDHTSize( reply.getEstimatedDHTSize());
stats.findNodeOK();
handler.findNodeReply( contact, reply.getContacts());
}catch( DHTUDPPacketHandlerException e ){
error( e );
}catch( Throwable e ){
Debug.printStackTrace(e);
error( new DHTUDPPacketHandlerException( "findNode failed", e ));
}
}
public void
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -