📄 dhttransportudpimpl.java
字号:
request.setKeys( packet_keys );
request.setValueSets( packet_value_sets );
final int f_packet_count = packet_count;
requestSendRequestProcessor( contact, request );
packet_handler.sendAndReceive(
request,
contact.getTransportAddress(),
new DHTUDPPacketReceiver()
{
public void
packetReceived(
DHTUDPPacketReply packet,
InetSocketAddress from_address,
long elapsed_time )
{
try{
if ( packet.getConnectionId() != connection_id ){
throw( new Exception( "connection id mismatch: sender=" + from_address + ",packet=" + packet.getString()));
}
contact.setInstanceIDAndVersion( packet.getTargetInstanceID(), packet.getProtocolVersion());
requestSendReplyProcessor( contact, handler, packet, elapsed_time );
DHTUDPPacketReplyStore reply = (DHTUDPPacketReplyStore)packet;
stats.storeOK();
if ( f_packet_count == 1 ){
handler.storeReply( contact, reply.getDiversificationTypes());
}
}catch( DHTUDPPacketHandlerException e ){
error( e );
}catch( Throwable e ){
Debug.printStackTrace(e);
error( new DHTUDPPacketHandlerException( "store failed", e ));
}
}
public void
error(
DHTUDPPacketHandlerException e )
{
stats.storeFailed();
if ( f_packet_count == 1 ){
handler.failed( contact, e );
}
}
},
store_timeout,
PRUDPPacketHandler.PRIORITY_LOW );
}
}catch( Throwable e ){
stats.storeFailed();
if ( packet_count <= 1 ){
handler.failed( contact, e );
}
}
}
// FIND NODE
public void
sendFindNode(
final DHTTransportUDPContactImpl contact,
final DHTTransportReplyHandler handler,
byte[] nid )
{
try{
checkAddress( contact );
final long connection_id = getConnectionID();
final DHTUDPPacketRequestFindNode request =
new DHTUDPPacketRequestFindNode( this, connection_id, local_contact, contact );
stats.findNodeSent( request );
request.setID( nid );
requestSendRequestProcessor( contact, request );
packet_handler.sendAndReceive(
request,
contact.getTransportAddress(),
new DHTUDPPacketReceiver()
{
public void
packetReceived(
DHTUDPPacketReply packet,
InetSocketAddress from_address,
long elapsed_time )
{
try{
if ( packet.getConnectionId() != connection_id ){
throw( new Exception( "connection id mismatch" ));
}
contact.setInstanceIDAndVersion( packet.getTargetInstanceID(), packet.getProtocolVersion());
requestSendReplyProcessor( contact, handler, packet, elapsed_time );
DHTUDPPacketReplyFindNode reply = (DHTUDPPacketReplyFindNode)packet;
// copy out the random id in preparation for a possible subsequent
// store operation
contact.setRandomID( reply.getRandomID());
updateContactStatus( contact, reply.getNodeStatus());
request_handler.setTransportEstimatedDHTSize( reply.getEstimatedDHTSize());
stats.findNodeOK();
handler.findNodeReply( contact, reply.getContacts());
}catch( DHTUDPPacketHandlerException e ){
error( e );
}catch( Throwable e ){
Debug.printStackTrace(e);
error( new DHTUDPPacketHandlerException( "findNode failed", e ));
}
}
public void
error(
DHTUDPPacketHandlerException e )
{
stats.findNodeFailed();
handler.failed( contact, e );
}
},
request_timeout, PRUDPPacketHandler.PRIORITY_MEDIUM );
}catch( Throwable e ){
stats.findNodeFailed();
handler.failed( contact, e );
}
}
// FIND VALUE
public void
sendFindValue(
final DHTTransportUDPContactImpl contact,
final DHTTransportReplyHandler handler,
byte[] key,
int max_values,
byte flags )
{
try{
checkAddress( contact );
final long connection_id = getConnectionID();
final DHTUDPPacketRequestFindValue request =
new DHTUDPPacketRequestFindValue( this, connection_id, local_contact, contact );
stats.findValueSent( request );
request.setID( key );
request.setMaximumValues( max_values );
request.setFlags( flags );
requestSendRequestProcessor( contact, request );
packet_handler.sendAndReceive(
request,
contact.getTransportAddress(),
new DHTUDPPacketReceiver()
{
public void
packetReceived(
DHTUDPPacketReply packet,
InetSocketAddress from_address,
long elapsed_time )
{
try{
if ( packet.getConnectionId() != connection_id ){
throw( new Exception( "connection id mismatch" ));
}
contact.setInstanceIDAndVersion( packet.getTargetInstanceID(), packet.getProtocolVersion());
requestSendReplyProcessor( contact, handler, packet, elapsed_time );
DHTUDPPacketReplyFindValue reply = (DHTUDPPacketReplyFindValue)packet;
stats.findValueOK();
DHTTransportValue[] res = reply.getValues();
if ( res != null ){
boolean continuation = reply.hasContinuation();
handler.findValueReply( contact, res, reply.getDiversificationType(), continuation);
}else{
handler.findValueReply( contact, reply.getContacts());
}
}catch( DHTUDPPacketHandlerException e ){
error( e );
}catch( Throwable e ){
Debug.printStackTrace(e);
error( new DHTUDPPacketHandlerException( "findValue failed", e ));
}
}
public void
error(
DHTUDPPacketHandlerException e )
{
stats.findValueFailed();
handler.failed( contact, e );
}
},
request_timeout, PRUDPPacketHandler.PRIORITY_HIGH );
}catch( Throwable e ){
if ( !(e instanceof DHTUDPPacketHandlerException )){
stats.findValueFailed();
handler.failed( contact, e );
}
}
}
protected DHTTransportFullStats
getFullStats(
DHTTransportUDPContactImpl contact )
{
if ( contact == local_contact ){
return( request_handler.statsRequest( contact ));
}
final DHTTransportFullStats[] res = { null };
final AESemaphore sem = new AESemaphore( "DHTTransportUDP:getFullStats");
sendStats( contact,
new DHTTransportReplyHandlerAdapter()
{
public void
statsReply(
DHTTransportContact _contact,
DHTTransportFullStats _stats )
{
res[0] = _stats;
sem.release();
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
sem.release();
}
});
sem.reserve();
return( res[0] );
}
// read request
protected void
sendReadRequest(
long connection_id,
DHTTransportUDPContactImpl contact,
byte[] transfer_key,
byte[] key )
{
sendReadRequest( connection_id, contact, transfer_key, key, 0, 0 );
}
protected void
sendReadRequest(
long connection_id,
DHTTransportUDPContactImpl contact,
byte[] transfer_key,
byte[] key,
int start_pos,
int len )
{
final DHTUDPPacketData request =
new DHTUDPPacketData( this, connection_id, local_contact, contact );
request.setDetails( DHTUDPPacketData.PT_READ_REQUEST, transfer_key, key, new byte[0], start_pos, len, 0 );
try{
checkAddress( contact );
if ( XFER_TRACE ){
logger.log( "Transfer read request: key = " + DHTLog.getFullString( key ) + ", contact = " + contact.getString());
}
stats.dataSent( request );
packet_handler.send(
request,
contact.getTransportAddress());
}catch( Throwable e ){
}
}
protected void
sendReadReply(
long connection_id,
DHTTransportUDPContactImpl contact,
byte[] transfer_key,
byte[] key,
byte[] data,
int start_position,
int length,
int total_length )
{
final DHTUDPPacketData request =
new DHTUDPPacketData( this, connection_id, local_contact, contact );
request.setDetails( DHTUDPPacketData.PT_READ_REPLY, transfer_key, key, data, start_position, length, total_length );
try{
checkAddress( contact );
if ( XFER_TRACE ){
logger.log( "Transfer read reply: key = " + DHTLog.getFullString( key ) + ", contact = " + contact.getString());
}
stats.dataSent( request );
packet_handler.send(
request,
contact.getTransportAddress());
}catch( Throwable e ){
}
}
protected void
sendWriteRequest(
long connection_id,
DHTTransportUDPContactImpl contact,
byte[] transfer_key,
byte[] key,
byte[] data,
int start_position,
int length,
int total_length )
{
final DHTUDPPacketData request =
new DHTUDPPacketData( this, connection_id, local_contact, contact );
request.setDetails( DHTUDPPacketData.PT_WRITE_REQUEST, transfer_key, key, data, start_position, length, total_length );
try{
checkAddress( contact );
if ( XFER_TRACE ){
logger.log( "Transfer write request: key = " + DHTLog.getFullString( key ) + ", contact = " + contact.getString());
}
stats.dataSent( request );
packet_handler.send(
request,
contact.getTransportAddress());
}catch( Throwable e ){
}
}
protected void
sendWriteReply(
long connection_id,
DHTTransportUDPContactImpl contact,
byte[] transfer_key,
byte[] key,
int start_position,
int length )
{
final DHTUDPPacketData request =
new DHTUDPPacketData( this, connection_id, local_contact, contact );
request.setDetails( DHTUDPPacketData.PT_WRITE_REPLY, transfer_key, key, new byte[0], start_position, length, 0 );
try{
checkAddress( contact );
if ( XFER_TRACE ){
logger.log( "Transfer write reply: key = " + DHTLog.getFullString( key ) + ", contact = " + contact.getString());
}
stats.dataSent( request );
packet_handler.send(
request,
contact.getTransportAddress());
}catch( Throwable e ){
}
}
public void
registerTransferHandler(
byte[] handler_key,
DHTTransportTransferHandler handler )
{
logger.log( "Transfer handler (" + handler.getName() + ") registered for key '" + ByteFormatter.encodeString( handler_key ));
transfer_handlers.put(
new HashWrapper( handler_key ),
new transferHandlerInterceptor(
handler ));
}
protected int
handleTransferRequest(
DHTTransportUDPContactImpl target,
long connection_id,
byte[] transfer_key,
byte[] request_key,
byte[] data,
int start,
int length,
boolean write_request,
boolean first_packet_only )
throws DHTTransportException
{
DHTTransportTransferHandler handler = (DHTTransportTransferHandler)transfer_handlers.get(new HashWrapper( transfer_key ));
if ( handler == null ){
logger.log( "No transfer handler registered for key '" + ByteFormatter.encodeString(transfer_key) + "'" );
throw( new DHTTransportException( "No transfer handler registered" ));
}
if ( data == null ){
data = handler.handleRead( target, request_key );
}
if ( data == null ){
return( -1 );
}else{
// special case 0 length data
if ( data.length == 0 ){
if ( write_request ){
sendWriteRequest(
connection_id,
target,
transfer_key,
request_key,
data,
0,
0,
0 );
}else{
sendReadReply(
connection_id,
target,
transfer_key,
request_key,
data,
0,
0,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -