📄 dhttransportudpimpl.java
字号:
// add the initial data for this write request
new_queue.add( req );
// set up the queue processor
new AEThread( "DHTTransportUDP:writeQueueProcessor", true )
{
public void
runSupport()
{
try{
byte[] write_data =
runTransferQueue(
new_queue,
new DHTTransportProgressListener()
{
public void
reportSize(
long size )
{
if ( XFER_TRACE ){
System.out.println( "writeXfer: size=" + size );
}
}
public void
reportActivity(
String str )
{
if ( XFER_TRACE ){
System.out.println( "writeXfer: act=" + str );
}
}
public void
reportCompleteness(
int percent )
{
if ( XFER_TRACE ){
System.out.println( "writeXfer: %=" + percent );
}
}
},
originator,
req.getTransferKey(),
req.getRequestKey(),
60000,
false );
if ( write_data != null ){
// xfer complete, send ack if multi-packet xfer
// (ack already sent below if single packet)
if ( req.getStartPosition() != 0 ||
req.getLength() != req.getTotalLength() ){
sendWriteReply(
req.getConnectionId(),
originator,
req.getTransferKey(),
req.getRequestKey(),
0,
req.getTotalLength());
}
byte[] reply_data = handler.handleWrite( originator, req.getRequestKey(), write_data );
if ( reply_data != null ){
writeTransfer(
new DHTTransportProgressListener()
{
public void
reportSize(
long size )
{
if ( XFER_TRACE ){
System.out.println( "writeXferReply: size=" + size );
}
}
public void
reportActivity(
String str )
{
if ( XFER_TRACE ){
System.out.println( "writeXferReply: act=" + str );
}
}
public void
reportCompleteness(
int percent )
{
if ( XFER_TRACE ){
System.out.println( "writeXferReply: %=" + percent );
}
}
},
originator,
req.getTransferKey(),
req.getRequestKey(),
reply_data,
WRITE_REPLY_TIMEOUT );
}
}
}catch( DHTTransportException e ){
logger.log( "Failed to process transfer queue: " + Debug.getNestedExceptionMessage(e));
}
}
}.start();
// indicate that at least one packet has been received
sendWriteReply(
req.getConnectionId(),
originator,
req.getTransferKey(),
req.getRequestKey(),
req.getStartPosition(),
req.getLength());
}catch( DHTTransportException e ){
logger.log( "Faild to create transfer queue" );
logger.log( e );
}
}
}
}
}
}
public byte[]
readTransfer(
DHTTransportProgressListener listener,
DHTTransportContact target,
byte[] handler_key,
byte[] key,
long timeout )
throws DHTTransportException
{
long connection_id = getConnectionID();
transferQueue transfer_queue = new transferQueue( read_transfers, connection_id );
return( runTransferQueue( transfer_queue, listener, target, handler_key, key, timeout, true ));
}
protected byte[]
runTransferQueue(
transferQueue transfer_queue,
DHTTransportProgressListener listener,
DHTTransportContact target,
byte[] handler_key,
byte[] key,
long timeout,
boolean read_transfer )
throws DHTTransportException
{
SortedSet packets =
new TreeSet(
new Comparator()
{
public int
compare(
Object o1,
Object o2 )
{
DHTUDPPacketData p1 = (DHTUDPPacketData)o1;
DHTUDPPacketData p2 = (DHTUDPPacketData)o2;
return( p1.getStartPosition() - p2.getStartPosition());
}
});
int entire_request_count = 0;
int transfer_size = -1;
int transferred = 0;
String target_name = DHTLog.getString2(target.getID());
try{
long start = SystemTime.getCurrentTime();
if ( read_transfer ){
listener.reportActivity( "Requesting entire transfer from " + target_name );
entire_request_count++;
sendReadRequest( transfer_queue.getID(), (DHTTransportUDPContactImpl)target, handler_key, key );
}else{
// write transfer - data already on its way, no need to request it
entire_request_count++;
}
while( SystemTime.getCurrentTime() - start <= timeout ){
DHTUDPPacketData reply = transfer_queue.receive( READ_XFER_REREQUEST_DELAY );
if ( reply != null ){
if ( transfer_size == -1 ){
transfer_size = reply.getTotalLength();
listener.reportSize( transfer_size );
}
Iterator it = packets.iterator();
boolean duplicate = false;
while( it.hasNext()){
DHTUDPPacketData p = (DHTUDPPacketData)it.next();
// ignore overlaps
if ( p.getStartPosition() < reply.getStartPosition() + reply.getLength() &&
p.getStartPosition() + p.getLength() > reply.getStartPosition()){
duplicate = true;
break;
}
}
if ( !duplicate ){
listener.reportActivity( "Received " + reply.getStartPosition() + " to " + (reply.getStartPosition() + reply.getLength()) + " from " + target_name );
transferred += reply.getLength();
listener.reportCompleteness( transfer_size==0?100: ( 100 * transferred / transfer_size ));
packets.add( reply );
// see if we're done
it = packets.iterator();
int pos = 0;
int actual_end = -1;
while( it.hasNext()){
DHTUDPPacketData p = (DHTUDPPacketData)it.next();
if ( actual_end == -1 ){
actual_end = p.getTotalLength();
}
if ( p.getStartPosition() != pos ){
// missing data, give up
break;
}
pos += p.getLength();
if ( pos == actual_end ){
// huzzah, we got the lot
listener.reportActivity( "Complete" );
byte[] result = new byte[actual_end];
it = packets.iterator();
pos = 0;
while( it.hasNext()){
p = (DHTUDPPacketData)it.next();
System.arraycopy( p.getData(), 0, result, pos, p.getLength());
pos += p.getLength();
}
return( result );
}
}
}
}else{
// timeout, look for missing bits
if ( packets.size() == 0 ){
if ( entire_request_count == 2 ){
listener.reportActivity( "Timeout, no replies received" );
return( null );
}
entire_request_count++;
listener.reportActivity( "Re-requesting entire transfer from " + target_name );
sendReadRequest( transfer_queue.getID(), (DHTTransportUDPContactImpl)target, handler_key, key );
}else{
Iterator it = packets.iterator();
int pos = 0;
int actual_end = -1;
while( it.hasNext()){
DHTUDPPacketData p = (DHTUDPPacketData)it.next();
if ( actual_end == -1 ){
actual_end = p.getTotalLength();
}
if ( p.getStartPosition() != pos ){
listener.reportActivity( "Re-requesting " + pos + " to " + p.getStartPosition() + " from " + target_name );
sendReadRequest(
transfer_queue.getID(),
(DHTTransportUDPContactImpl)target,
handler_key,
key,
pos,
p.getStartPosition()-pos );
}
pos = p.getStartPosition() + p.getLength();
}
if ( pos != actual_end ){
listener.reportActivity( "Re-requesting " + pos + " to " + actual_end + " from " + target_name );
sendReadRequest(
transfer_queue.getID(),
(DHTTransportUDPContactImpl)target,
handler_key,
key,
pos,
actual_end - pos );
}
}
}
}
listener.reportActivity(
"Timeout, " +
(packets.size()==0?
" no replies received":
("" + packets.size() + " packets received but incomplete" )));
return( null );
}finally{
transfer_queue.destroy();
}
}
public void
writeTransfer(
DHTTransportProgressListener listener,
DHTTransportContact target,
byte[] handler_key,
byte[] key,
byte[] data,
long timeout )
throws DHTTransportException
{
transferQueue transfer_queue = null;
try{
long connection_id = getConnectionID();
transfer_queue = new transferQueue( write_transfers, connection_id );
boolean ok = false;
boolean reply_received = false;
int loop = 0;
int total_length = data.length;
long start = SystemTime.getCurrentTime();
long last_packet_time = 0;
while( true ){
long now = SystemTime.getCurrentTime();
if ( now < start ){
start = now;
last_packet_time = 0;
}else{
if ( now - start > timeout ){
break;
}
}
long time_since_last_packet = now - last_packet_time;
if ( time_since_last_packet >= WRITE_XFER_RESEND_DELAY ){
listener.reportActivity( loop==0?"Sending data":"Resending data" );
loop++;
total_length = handleTransferRequest(
(DHTTransportUDPContactImpl)target,
connection_id,
handler_key,
key,
data,
-1, -1,
true,
reply_received ); // first packet only if we've has a reply
last_packet_time = now;
time_since_last_packet = 0;
}
DHTUDPPacketData packet = transfer_queue.receive( WRITE_XFER_RESEND_DELAY - time_since_last_packet );
if ( packet != null ){
last_packet_time = now;
reply_received = true;
if ( packet.getStartPosition() == 0 && packet.getLength() == total_length ){
ok = true;
break;
}
}
}
if ( ok ){
listener.reportCompleteness( 100 );
listener.reportActivity( "Complete" );
}else{
listener.reportActivity( "Failed, timeout" );
throw( new DHTTransportException( "Timeout" ));
}
}finally{
if ( transfer_queue != null ){
transfer_queue.destroy();
}
}
}
public byte[]
writeReadTransfer(
DHTTransportProgressListener listener,
DHTTransportContact target,
byte[] handler_key,
byte[] data,
long timeout )
throws DHTTransportException
{
byte[] call_key = new byte[20];
random.nextBytes( call_key );
AESemaphore call_sem = new AESemaphore( "DHTTransportUDP:calSem" );
HashWrapper wrapped_key = new HashWrapper( call_key );
try{
this_mon.enter();
call_transfers.put( wrapped_key, call_sem );
}finally{
this_mon.exit();
}
writeTransfer( listener, target, handler_key, call_key, data, timeout );
if ( call_sem.reserve( timeout )){
try{
this_mon.enter();
Object res = call_transfers.remove( wrapped_key );
if ( res instanceof byte[] ){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -