📄 dhtcontrolimpl.java
字号:
// we can use the results of this to estimate the DHT size
estimateDHTSize( lookup_id, contacts_queried, search_accuracy );
}
}finally{
contacts_to_query_mon.exit();
}
result_handler.closest( closest_res );
}finally{
result_handler.complete( timeout_occurred );
}
}
// Request methods
public void
pingRequest(
DHTTransportContact originating_contact )
{
DHTLog.log( "pingRequest from " + DHTLog.getString( originating_contact.getID()));
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
}
public void
keyBlockRequest(
DHTTransportContact originating_contact,
byte[] request,
byte[] sig )
{
DHTLog.log( "keyBlockRequest from " + DHTLog.getString( originating_contact.getID()));
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
database.keyBlockRequest( originating_contact, request, sig );
}
public DHTTransportStoreReply
storeRequest(
DHTTransportContact originating_contact,
byte[][] keys,
DHTTransportValue[][] value_sets )
{
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
DHTLog.log( "storeRequest from " + DHTLog.getString( originating_contact )+ ", keys = " + keys.length );
byte[] diverse_res = new byte[ keys.length];
Arrays.fill( diverse_res, DHT.DT_NONE );
if ( keys.length != value_sets.length ){
Debug.out( "DHTControl:storeRequest - invalid request received from " + originating_contact.getString() + ", keys and values length mismatch");
return( new DHTTransportStoreReplyImpl( diverse_res ));
}
// System.out.println( "storeRequest: received " + originating_contact.getRandomID() + " from " + originating_contact.getAddress());
DHTStorageBlock blocked_details = null;
for (int i=0;i<keys.length;i++){
HashWrapper key = new HashWrapper( keys[i] );
DHTTransportValue[] values = value_sets[i];
DHTLog.log( " key=" + DHTLog.getString(key) + ", value=" + DHTLog.getString(values));
diverse_res[i] = database.store( originating_contact, key, values );
if ( blocked_details == null ){
blocked_details = database.getKeyBlockDetails( keys[i] );
}
}
// fortunately we can get away with this as diversifications are only taken note of by initial, single value stores
// and not by the multi-value cache forwards...
if ( blocked_details == null ){
return( new DHTTransportStoreReplyImpl( diverse_res ));
}else{
return( new DHTTransportStoreReplyImpl( blocked_details.getRequest(), blocked_details.getCertificate()));
}
}
public DHTTransportContact[]
findNodeRequest(
DHTTransportContact originating_contact,
byte[] id )
{
DHTLog.log( "findNodeRequest from " + DHTLog.getString( originating_contact.getID()));
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
List l;
if ( id.length == router.getID().length ){
l = getClosestKContactsList( id, false );
}else{
// this helps both protect against idiot queries and also saved bytes when we use findNode
// to just get a random ID prior to cache-forwards
l = new ArrayList();
}
final DHTTransportContact[] res = new DHTTransportContact[l.size()];
l.toArray( res );
int rand = generateSpoofID( originating_contact );
originating_contact.setRandomID( rand );
return( res );
}
public DHTTransportFindValueReply
findValueRequest(
DHTTransportContact originating_contact,
byte[] key,
int max_values,
byte flags )
{
DHTLog.log( "findValueRequest from " + DHTLog.getString( originating_contact.getID()));
DHTDBLookupResult result = database.get( originating_contact, new HashWrapper( key ), max_values, flags, true );
if ( result != null ){
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
DHTStorageBlock block_details = database.getKeyBlockDetails( key );
if ( block_details == null ){
return( new DHTTransportFindValueReplyImpl( result.getDiversificationType(), result.getValues()));
}else{
return( new DHTTransportFindValueReplyImpl( block_details.getRequest(), block_details.getCertificate()));
}
}else{
return( new DHTTransportFindValueReplyImpl( findNodeRequest( originating_contact, key )));
}
}
public DHTTransportFullStats
statsRequest(
DHTTransportContact contact )
{
return( stats );
}
protected void
requestPing(
DHTRouterContact contact )
{
((DHTControlContactImpl)contact.getAttachment()).getTransportContact().sendPing(
new DHTTransportReplyHandlerAdapter()
{
public void
pingReply(
DHTTransportContact _contact )
{
DHTLog.log( "ping OK " + DHTLog.getString( _contact ));
router.contactAlive( _contact.getID(), new DHTControlContactImpl(_contact));
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
DHTLog.log( "ping " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false );
}
});
}
protected void
nodeAddedToRouter(
DHTRouterContact new_contact )
{
// ignore ourselves
if ( router.isID( new_contact.getID())){
return;
}
// when a new node is added we must check to see if we need to transfer
// any of our values to it.
Map keys_to_store = new HashMap();
DHTStorageBlock[] direct_key_blocks = database.getDirectKeyBlocks();
if ( database.isEmpty() && direct_key_blocks.length == 0 ){
// nothing to do, ping it if it isn't known to be alive
if ( !new_contact.hasBeenAlive()){
requestPing( new_contact );
}
return;
}
// see if we're one of the K closest to the new node
List closest_contacts = getClosestKContactsList( new_contact.getID(), false );
boolean close = false;
for (int i=0;i<closest_contacts.size();i++){
if ( router.isID(((DHTTransportContact)closest_contacts.get(i)).getID())){
close = true;
break;
}
}
if ( !close ){
if ( !new_contact.hasBeenAlive()){
requestPing( new_contact );
}
return;
}
// ok, we're close enough to worry about transferring values
Iterator it = database.getKeys();
while( it.hasNext()){
HashWrapper key = (HashWrapper)it.next();
byte[] encoded_key = key.getHash();
if ( database.isKeyBlocked( encoded_key )){
continue;
}
DHTDBLookupResult result = database.get( null, key, 0, (byte)0, false );
if ( result == null ){
// deleted in the meantime
continue;
}
// even if a result has been diversified we continue to maintain the base value set
// until the original publisher picks up the diversification (next publish period) and
// publishes to the correct place
DHTDBValue[] values = result.getValues();
List values_to_store = new ArrayList();
for (int i=0;i<values.length;i++){
DHTDBValue value = values[i];
// we don't consider any cached further away than the initial location, for transfer
// however, we *do* include ones we originate as, if we're the closest, we have to
// take responsibility for xfer (as others won't)
List sorted_contacts = getClosestKContactsList( encoded_key, false );
// if we're closest to the key, or the new node is closest and
// we're second closest, then we take responsibility for storing
// the value
boolean store_it = false;
if ( sorted_contacts.size() > 0 ){
DHTTransportContact first = (DHTTransportContact)sorted_contacts.get(0);
if ( router.isID( first.getID())){
store_it = true;
}else if ( Arrays.equals( first.getID(), new_contact.getID()) && sorted_contacts.size() > 1 ){
store_it = router.isID(((DHTTransportContact)sorted_contacts.get(1)).getID());
}
}
if ( store_it ){
values_to_store.add( value );
}
}
if ( values_to_store.size() > 0 ){
keys_to_store.put( key, values_to_store );
}
}
final DHTTransportContact t_contact = ((DHTControlContactImpl)new_contact.getAttachment()).getTransportContact();
final boolean[] anti_spoof_done = { false };
if ( keys_to_store.size() > 0 ){
it = keys_to_store.entrySet().iterator();
final byte[][] keys = new byte[keys_to_store.size()][];
final DHTTransportValue[][] value_sets = new DHTTransportValue[keys.length][];
int index = 0;
while( it.hasNext()){
Map.Entry entry = (Map.Entry)it.next();
HashWrapper key = (HashWrapper)entry.getKey();
List values = (List)entry.getValue();
keys[index] = key.getHash();
value_sets[index] = new DHTTransportValue[values.size()];
for (int i=0;i<values.size();i++){
value_sets[index][i] = ((DHTDBValue)values.get(i)).getValueForRelay( local_contact );
}
index++;
}
// move to anti-spoof for cache forwards. we gotta do a findNode to update the
// contact's latest random id
t_contact.sendFindNode(
new DHTTransportReplyHandlerAdapter()
{
public void
findNodeReply(
DHTTransportContact contact,
DHTTransportContact[] contacts )
{
// System.out.println( "nodeAdded: pre-store findNode OK" );
anti_spoof_done[0] = true;
t_contact.sendStore(
new DHTTransportReplyHandlerAdapter()
{
public void
storeReply(
DHTTransportContact _contact,
byte[] _diversifications )
{
// System.out.println( "nodeAdded: store OK" );
// don't consider diversifications for node additions as they're not interested
// in getting values from us, they need to get them from nodes 'near' to the
// diversification targets or the originator
DHTLog.log( "add store ok" );
router.contactAlive( _contact.getID(), new DHTControlContactImpl(_contact));
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
// System.out.println( "nodeAdded: store Failed" );
DHTLog.log( "add store failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false);
}
public void
keyBlockRequest(
DHTTransportContact contact,
byte[] request,
byte[] signature )
{
database.keyBlockRequest( null, request, signature );
}
},
keys,
value_sets );
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
// System.out.println( "nodeAdded: pre-store findNode Failed" );
DHTLog.log( "pre-store findNode failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false);
}
},
t_contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_ANTI_SPOOF2?new byte[0]:new byte[20] );
}else{
if ( !new_contact.hasBeenAlive()){
requestPing( new_contact );
}
}
// finally transfer any key-blocks
if ( t_contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_BLOCK_KEYS ){
for (int i=0;i<direct_key_blocks.length;i++){
final DHTStorageBlock key_block = direct_key_blocks[i];
List contacts = getClosestKContactsList( key_block.getKey(), false );
boolean forward_it = false;
// ensure that the key is close enough to us
for (int j=0;j<contacts.size();j++){
final DHTTransportContact contact = (DHTTransportContact)contacts.get(j);
if ( router.isID( contact.getID())){
forward_it = true;
break;
}
}
if ( !forward_it || key_block.hasBeenSentTo( t_contact )){
continue;
}
final Runnable task =
new Runnable()
{
public void
run()
{
t_contact.sendKeyBlock(
new DHTTransportReplyHandlerAdapter()
{
public void
keyBlockReply(
DHTTransportContact _contact )
{
DHTLog.log( "key block forward ok " + DHTLog.getString( _contact ));
key_block.sentTo( _contact );
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
DHTLog.log( "key block forward failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
}
},
ke
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -