📄 dhttransportudpimpl.java
字号:
error(
DHTUDPPacketHandlerException e )
{
stats.findNodeFailed();
handler.failed( contact, e );
}
},
request_timeout, PRUDPPacketHandler.PRIORITY_MEDIUM );
}catch( Throwable e ){
stats.findNodeFailed();
handler.failed( contact, e );
}
}
// FIND VALUE
public void
sendFindValue(
final DHTTransportUDPContactImpl contact,
final DHTTransportReplyHandler handler,
byte[] key,
int max_values,
byte flags )
{
try{
checkAddress( contact );
final long connection_id = getConnectionID();
final DHTUDPPacketRequestFindValue request =
new DHTUDPPacketRequestFindValue( this, connection_id, local_contact, contact );
stats.findValueSent( request );
request.setID( key );
request.setMaximumValues( max_values );
request.setFlags( flags );
requestSendRequestProcessor( contact, request );
packet_handler.sendAndReceive(
request,
contact.getTransportAddress(),
new DHTUDPPacketReceiver()
{
public void
packetReceived(
DHTUDPPacketReply packet,
InetSocketAddress from_address,
long elapsed_time )
{
try{
if ( packet.getConnectionId() != connection_id ){
throw( new Exception( "connection id mismatch" ));
}
contact.setInstanceIDAndVersion( packet.getTargetInstanceID(), packet.getProtocolVersion());
requestSendReplyProcessor( contact, packet, elapsed_time );
DHTUDPPacketReplyFindValue reply = (DHTUDPPacketReplyFindValue)packet;
stats.findValueOK();
DHTTransportValue[] res = reply.getValues();
if ( res != null ){
boolean continuation = reply.hasContinuation();
handler.findValueReply( contact, res, reply.getDiversificationType(), continuation);
}else{
handler.findValueReply( contact, reply.getContacts());
}
}catch( DHTUDPPacketHandlerException e ){
error( e );
}catch( Throwable e ){
Debug.printStackTrace(e);
error( new DHTUDPPacketHandlerException( "findValue failed", e ));
}
}
public void
error(
DHTUDPPacketHandlerException e )
{
stats.findValueFailed();
handler.failed( contact, e );
}
},
request_timeout, PRUDPPacketHandler.PRIORITY_HIGH );
}catch( Throwable e ){
if ( !(e instanceof DHTUDPPacketHandlerException )){
stats.findValueFailed();
handler.failed( contact, e );
}
}
}
protected DHTTransportFullStats
getFullStats(
DHTTransportUDPContactImpl contact )
{
if ( contact == local_contact ){
return( request_handler.statsRequest( contact ));
}
final DHTTransportFullStats[] res = { null };
final AESemaphore sem = new AESemaphore( "DHTTransportUDP:getFullStats");
sendStats( contact,
new DHTTransportReplyHandlerAdapter()
{
public void
statsReply(
DHTTransportContact _contact,
DHTTransportFullStats _stats )
{
res[0] = _stats;
sem.release();
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
sem.release();
}
});
sem.reserve();
return( res[0] );
}
// read request
protected void
sendReadRequest(
long connection_id,
DHTTransportUDPContactImpl contact,
byte[] transfer_key,
byte[] key )
{
sendReadRequest( connection_id, contact, transfer_key, key, 0, 0 );
}
protected void
sendReadRequest(
long connection_id,
DHTTransportUDPContactImpl contact,
byte[] transfer_key,
byte[] key,
int start_pos,
int len )
{
final DHTUDPPacketData request =
new DHTUDPPacketData( this, connection_id, local_contact, contact );
request.setDetails( DHTUDPPacketData.PT_READ_REQUEST, transfer_key, key, new byte[0], start_pos, len, 0 );
try{
checkAddress( contact );
if ( XFER_TRACE ){
logger.log( "Transfer read request: key = " + DHTLog.getFullString( key ) + ", contact = " + contact.getString());
}
stats.dataSent( request );
packet_handler.send(
request,
contact.getTransportAddress());
}catch( Throwable e ){
}
}
protected void
sendReadReply(
long connection_id,
DHTTransportUDPContactImpl contact,
byte[] transfer_key,
byte[] key,
byte[] data,
int start_position,
int length,
int total_length )
{
final DHTUDPPacketData request =
new DHTUDPPacketData( this, connection_id, local_contact, contact );
request.setDetails( DHTUDPPacketData.PT_READ_REPLY, transfer_key, key, data, start_position, length, total_length );
try{
checkAddress( contact );
if ( XFER_TRACE ){
logger.log( "Transfer read reply: key = " + DHTLog.getFullString( key ) + ", contact = " + contact.getString());
}
stats.dataSent( request );
packet_handler.send(
request,
contact.getTransportAddress());
}catch( Throwable e ){
}
}
protected void
sendWriteRequest(
long connection_id,
DHTTransportUDPContactImpl contact,
byte[] transfer_key,
byte[] key,
byte[] data,
int start_position,
int length,
int total_length )
{
final DHTUDPPacketData request =
new DHTUDPPacketData( this, connection_id, local_contact, contact );
request.setDetails( DHTUDPPacketData.PT_WRITE_REQUEST, transfer_key, key, data, start_position, length, total_length );
try{
checkAddress( contact );
if ( XFER_TRACE ){
logger.log( "Transfer write request: key = " + DHTLog.getFullString( key ) + ", contact = " + contact.getString());
}
stats.dataSent( request );
packet_handler.send(
request,
contact.getTransportAddress());
}catch( Throwable e ){
}
}
protected void
sendWriteReply(
long connection_id,
DHTTransportUDPContactImpl contact,
byte[] transfer_key,
byte[] key,
int start_position,
int length )
{
final DHTUDPPacketData request =
new DHTUDPPacketData( this, connection_id, local_contact, contact );
request.setDetails( DHTUDPPacketData.PT_WRITE_REPLY, transfer_key, key, new byte[0], start_position, length, 0 );
try{
checkAddress( contact );
if ( XFER_TRACE ){
logger.log( "Transfer write reply: key = " + DHTLog.getFullString( key ) + ", contact = " + contact.getString());
}
stats.dataSent( request );
packet_handler.send(
request,
contact.getTransportAddress());
}catch( Throwable e ){
}
}
public void
registerTransferHandler(
byte[] handler_key,
DHTTransportTransferHandler handler )
{
transfer_handlers.put(
new HashWrapper( handler_key ),
new transferHandlerInterceptor(
handler ));
}
protected int
handleTransferRequest(
DHTTransportUDPContactImpl target,
long connection_id,
byte[] transfer_key,
byte[] request_key,
byte[] data,
int start,
int length,
boolean write_request,
boolean first_packet_only )
throws DHTTransportException
{
DHTTransportTransferHandler handler = (DHTTransportTransferHandler)transfer_handlers.get(new HashWrapper( transfer_key ));
if ( handler == null ){
logger.log( "No transfer handler registered for key" );
throw( new DHTTransportException( "No transfer handler registered" ));
}
if ( data == null ){
data = handler.handleRead( target, request_key );
}
if ( data == null ){
return( -1 );
}else{
// special case 0 length data
if ( data.length == 0 ){
if ( write_request ){
sendWriteRequest(
connection_id,
target,
transfer_key,
request_key,
data,
0,
0,
0 );
}else{
sendReadReply(
connection_id,
target,
transfer_key,
request_key,
data,
0,
0,
0 );
}
}else{
if ( start < 0 ){
start = 0;
}else if ( start >= data.length ){
logger.log( "dataRequest: invalid start position" );
return( data.length );
}
if ( length <= 0 ){
length = data.length;
}else if ( start + length > data.length ){
logger.log( "dataRequest: invalid length" );
return( data.length );
}
int end = start+length;
while( start < end ){
int chunk = end - start;
if ( chunk > DHTUDPPacketData.MAX_DATA_SIZE ){
chunk = DHTUDPPacketData.MAX_DATA_SIZE;
}
if ( write_request ){
sendWriteRequest(
connection_id,
target,
transfer_key,
request_key,
data,
start,
chunk,
data.length );
if ( first_packet_only ){
break;
}
}else{
sendReadReply(
connection_id,
target,
transfer_key,
request_key,
data,
start,
chunk,
data.length );
}
start += chunk;
}
}
return( data.length );
}
}
protected void
dataRequest(
final DHTTransportUDPContactImpl originator,
final DHTUDPPacketData req )
{
/*
if ((int)(Math.random() * 4 )== 0 ){
System.out.println("dropping request packet:" + req.getString());
return;
}
*/
stats.dataReceived();
// both requests and replies come through here. Currently we only support read
// requests so we can safely use the data.length == 0 test to discriminate between
// a request and a reply to an existing transfer
byte packet_type = req.getPacketType();
if ( packet_type == DHTUDPPacketData.PT_READ_REPLY ){
transferQueue queue = lookupTransferQueue( read_transfers, req.getConnectionId());
// unmatched -> drop it
if ( queue != null ){
queue.add( req );
}
}else if ( packet_type == DHTUDPPacketData.PT_WRITE_REPLY ){
transferQueue queue = lookupTransferQueue( write_transfers, req.getConnectionId());
// unmatched -> drop it
if ( queue != null ){
queue.add( req );
}
}else{
byte[] transfer_key = req.getTransferKey();
if ( packet_type == DHTUDPPacketData.PT_READ_REQUEST ){
try{
handleTransferRequest(
originator,
req.getConnectionId(),
transfer_key,
req.getRequestKey(),
null,
req.getStartPosition(),
req.getLength(),
false, false );
}catch( DHTTransportException e ){
logger.log(e);
}
}else{
// write request
transferQueue old_queue = lookupTransferQueue( read_transfers, req.getConnectionId());
if ( old_queue != null ){
old_queue.add( req );
}else{
final DHTTransportTransferHandler handler = (DHTTransportTransferHandler)transfer_handlers.get(new HashWrapper( transfer_key ));
if ( handler == null ){
logger.log( "No transfer handler registered for key" );
}else{
try{
final transferQueue new_queue = new transferQueue( read_transfers, req.getConnectionId());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -