📄 dhttransportudpimpl.java
字号:
0 );
}
}else{
if ( start < 0 ){
start = 0;
}else if ( start >= data.length ){
logger.log( "dataRequest: invalid start position" );
return( data.length );
}
if ( length <= 0 ){
length = data.length;
}else if ( start + length > data.length ){
logger.log( "dataRequest: invalid length" );
return( data.length );
}
int end = start+length;
while( start < end ){
int chunk = end - start;
if ( chunk > DHTUDPPacketData.MAX_DATA_SIZE ){
chunk = DHTUDPPacketData.MAX_DATA_SIZE;
}
if ( write_request ){
sendWriteRequest(
connection_id,
target,
transfer_key,
request_key,
data,
start,
chunk,
data.length );
if ( first_packet_only ){
break;
}
}else{
sendReadReply(
connection_id,
target,
transfer_key,
request_key,
data,
start,
chunk,
data.length );
}
start += chunk;
}
}
return( data.length );
}
}
protected void
dataRequest(
final DHTTransportUDPContactImpl originator,
final DHTUDPPacketData req )
{
/*
if ((int)(Math.random() * 4 )== 0 ){
System.out.println("dropping request packet:" + req.getString());
return;
}
*/
stats.dataReceived();
// both requests and replies come through here. Currently we only support read
// requests so we can safely use the data.length == 0 test to discriminate between
// a request and a reply to an existing transfer
byte packet_type = req.getPacketType();
if ( packet_type == DHTUDPPacketData.PT_READ_REPLY ){
transferQueue queue = lookupTransferQueue( read_transfers, req.getConnectionId());
// unmatched -> drop it
if ( queue != null ){
queue.add( req );
}
}else if ( packet_type == DHTUDPPacketData.PT_WRITE_REPLY ){
transferQueue queue = lookupTransferQueue( write_transfers, req.getConnectionId());
// unmatched -> drop it
if ( queue != null ){
queue.add( req );
}
}else{
byte[] transfer_key = req.getTransferKey();
if ( packet_type == DHTUDPPacketData.PT_READ_REQUEST ){
try{
handleTransferRequest(
originator,
req.getConnectionId(),
transfer_key,
req.getRequestKey(),
null,
req.getStartPosition(),
req.getLength(),
false, false );
}catch( DHTTransportException e ){
logger.log(e);
}
}else{
// write request
transferQueue old_queue = lookupTransferQueue( read_transfers, req.getConnectionId());
if ( old_queue != null ){
old_queue.add( req );
}else{
final DHTTransportTransferHandler handler = (DHTTransportTransferHandler)transfer_handlers.get(new HashWrapper( transfer_key ));
if ( handler == null ){
logger.log( "No transfer handler registered for key '" + ByteFormatter.encodeString(transfer_key) + "'" );
}else{
try{
final transferQueue new_queue = new transferQueue( read_transfers, req.getConnectionId());
// add the initial data for this write request
new_queue.add( req );
// set up the queue processor
new AEThread( "DHTTransportUDP:writeQueueProcessor", true )
{
public void
runSupport()
{
try{
byte[] write_data =
runTransferQueue(
new_queue,
new DHTTransportProgressListener()
{
public void
reportSize(
long size )
{
if ( XFER_TRACE ){
System.out.println( "writeXfer: size=" + size );
}
}
public void
reportActivity(
String str )
{
if ( XFER_TRACE ){
System.out.println( "writeXfer: act=" + str );
}
}
public void
reportCompleteness(
int percent )
{
if ( XFER_TRACE ){
System.out.println( "writeXfer: %=" + percent );
}
}
},
originator,
req.getTransferKey(),
req.getRequestKey(),
60000,
false );
if ( write_data != null ){
// xfer complete, send ack if multi-packet xfer
// (ack already sent below if single packet)
if ( req.getStartPosition() != 0 ||
req.getLength() != req.getTotalLength() ){
sendWriteReply(
req.getConnectionId(),
originator,
req.getTransferKey(),
req.getRequestKey(),
0,
req.getTotalLength());
}
byte[] reply_data = handler.handleWrite( originator, req.getRequestKey(), write_data );
if ( reply_data != null ){
writeTransfer(
new DHTTransportProgressListener()
{
public void
reportSize(
long size )
{
if ( XFER_TRACE ){
System.out.println( "writeXferReply: size=" + size );
}
}
public void
reportActivity(
String str )
{
if ( XFER_TRACE ){
System.out.println( "writeXferReply: act=" + str );
}
}
public void
reportCompleteness(
int percent )
{
if ( XFER_TRACE ){
System.out.println( "writeXferReply: %=" + percent );
}
}
},
originator,
req.getTransferKey(),
req.getRequestKey(),
reply_data,
WRITE_REPLY_TIMEOUT );
}
}
}catch( DHTTransportException e ){
logger.log( "Failed to process transfer queue: " + Debug.getNestedExceptionMessage(e));
}
}
}.start();
// indicate that at least one packet has been received
sendWriteReply(
req.getConnectionId(),
originator,
req.getTransferKey(),
req.getRequestKey(),
req.getStartPosition(),
req.getLength());
}catch( DHTTransportException e ){
logger.log( "Faild to create transfer queue" );
logger.log( e );
}
}
}
}
}
}
public byte[]
readTransfer(
DHTTransportProgressListener listener,
DHTTransportContact target,
byte[] handler_key,
byte[] key,
long timeout )
throws DHTTransportException
{
long connection_id = getConnectionID();
transferQueue transfer_queue = new transferQueue( read_transfers, connection_id );
return( runTransferQueue( transfer_queue, listener, target, handler_key, key, timeout, true ));
}
protected byte[]
runTransferQueue(
transferQueue transfer_queue,
DHTTransportProgressListener listener,
DHTTransportContact target,
byte[] handler_key,
byte[] key,
long timeout,
boolean read_transfer )
throws DHTTransportException
{
SortedSet packets =
new TreeSet(
new Comparator()
{
public int
compare(
Object o1,
Object o2 )
{
DHTUDPPacketData p1 = (DHTUDPPacketData)o1;
DHTUDPPacketData p2 = (DHTUDPPacketData)o2;
return( p1.getStartPosition() - p2.getStartPosition());
}
});
int entire_request_count = 0;
int transfer_size = -1;
int transferred = 0;
String target_name = DHTLog.getString2(target.getID());
try{
long start = SystemTime.getCurrentTime();
if ( read_transfer ){
listener.reportActivity( getMessageText( "request_all", target_name ));
entire_request_count++;
sendReadRequest( transfer_queue.getID(), (DHTTransportUDPContactImpl)target, handler_key, key );
}else{
// write transfer - data already on its way, no need to request it
entire_request_count++;
}
while( SystemTime.getCurrentTime() - start <= timeout ){
DHTUDPPacketData reply = transfer_queue.receive( READ_XFER_REREQUEST_DELAY );
if ( reply != null ){
if ( transfer_size == -1 ){
transfer_size = reply.getTotalLength();
listener.reportSize( transfer_size );
}
Iterator it = packets.iterator();
boolean duplicate = false;
while( it.hasNext()){
DHTUDPPacketData p = (DHTUDPPacketData)it.next();
// ignore overlaps
if ( p.getStartPosition() < reply.getStartPosition() + reply.getLength() &&
p.getStartPosition() + p.getLength() > reply.getStartPosition()){
duplicate = true;
break;
}
}
if ( !duplicate ){
listener.reportActivity(
getMessageText( "received_bit",
new String[]{
String.valueOf( reply.getStartPosition()),
String.valueOf(reply.getStartPosition() + reply.getLength()),
target_name }));
transferred += reply.getLength();
listener.reportCompleteness( transfer_size==0?100: ( 100 * transferred / transfer_size ));
packets.add( reply );
// see if we're done
it = packets.iterator();
int pos = 0;
int actual_end = -1;
while( it.hasNext()){
DHTUDPPacketData p = (DHTUDPPacketData)it.next();
if ( actual_end == -1 ){
actual_end = p.getTotalLength();
}
if ( p.getStartPosition() != pos ){
// missing data, give up
break;
}
pos += p.getLength();
if ( pos == actual_end ){
// huzzah, we got the lot
listener.reportActivity( getMessageText( "complete" ));
byte[] result = new byte[actual_end];
it = packets.iterator();
pos = 0;
while( it.hasNext()){
p = (DHTUDPPacketData)it.next();
System.arraycopy( p.getData(), 0, result, pos, p.getLength());
pos += p.getLength();
}
return( result );
}
}
}
}else{
// timeout, look for missing bits
if ( packets.size() == 0 ){
if ( entire_request_count == 2 ){
listener.reportActivity( getMessageText( "timeout", target_name ));
return( null );
}
entire_request_count++;
listener.reportActivity( getMessageText( "rerequest_all", target_name ));
sendReadRequest( transfer_queue.getID(), (DHTTransportUDPContactImpl)target, handler_key, key );
}else{
Iterator it = packets.iterator();
int pos = 0;
int actual_end = -1;
while( it.hasNext()){
DHTUDPPacketData p = (DHTUDPPacketData)it.next();
if ( actual_end == -1 ){
actual_end = p.getTotalLength();
}
if ( p.getStartPosition() != pos ){
listener.reportActivity(
getMessageText( "rerequest_bit",
new String[]{
String.valueOf( pos ),
String.valueOf( p.getStartPosition()),
target_name }));
sendReadRequest(
transfer_queue.getID(),
(DHTTransportUDPContactImpl)target,
handler_key,
key,
pos,
p.getStartPositio
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -