📄 dhtcontrolimpl.java
字号:
continue;
}
if ( contacts_queried.get( new HashWrapper( contact.getID())) == null &&
(!contacts_to_query.contains( contact ))){
DHTLog.log( " new contact for query: " + DHTLog.getString( contact ));
contacts_to_query.add( contact );
result_handler.found( contact );
level_map.put( contact, new Integer( search_level+1));
if ( idle_searches[0] > 0 ){
idle_searches[0]--;
search_sem.release();
}
}else{
// DHTLog.log( " already queried: " + DHTLog.getString( contact ));
}
}
}finally{
contacts_to_query_mon.exit();
}
}finally{
try{
contacts_to_query_mon.enter();
active_searches[0]--;
}finally{
contacts_to_query_mon.exit();
}
search_sem.release();
}
}
public void
findValueReply(
DHTTransportContact contact,
DHTTransportValue[] values,
byte diversification_type,
boolean more_to_come )
{
DHTLog.log( "findValueReply: " + DHTLog.getString( values ) + ",mtc=" + more_to_come + ", dt=" + diversification_type );
try{
if ( diversification_type != DHT.DT_NONE ){
// diversification instruction
result_handler.diversify( contact, diversification_type );
}
value_reply_received = true;
router.contactAlive( contact.getID(), new DHTControlContactImpl(contact));
int new_values = 0;
for (int i=0;i<values.length;i++){
DHTTransportValue value = values[i];
DHTTransportContact originator = value.getOriginator();
// can't just use originator id as this value can be DOSed (see DB code)
byte[] originator_id = originator.getID();
byte[] value_bytes = value.getValue();
byte[] value_id = new byte[originator_id.length + value_bytes.length];
System.arraycopy( originator_id, 0, value_id, 0, originator_id.length );
System.arraycopy( value_bytes, 0, value_id, originator_id.length, value_bytes.length );
HashWrapper x = new HashWrapper( value_id );
if ( !values_found_set.contains( x )){
new_values++;
values_found_set.add( x );
result_handler.read( contact, values[i] );
}
}
try{
contacts_to_query_mon.enter();
if ( !more_to_come ){
value_replies[0]++;
}
values_found[0] += new_values;
}finally{
contacts_to_query_mon.exit();
}
}finally{
if ( !more_to_come ){
try{
contacts_to_query_mon.enter();
active_searches[0]--;
}finally{
contacts_to_query_mon.exit();
}
search_sem.release();
}
}
}
public void
findValueReply(
DHTTransportContact contact,
DHTTransportContact[] contacts )
{
findNodeReply( contact, contacts );
}
public void
failed(
DHTTransportContact target_contact,
Throwable error )
{
try{
// if at least one reply has been received then we
// don't treat subsequent failure as indication of
// a contact failure (just packet loss)
if ( !value_reply_received ){
DHTLog.log( "findNode/findValue " + DHTLog.getString( target_contact ) + " -> failed: " + error.getMessage());
router.contactDead( target_contact.getID(), false );
}
}finally{
try{
contacts_to_query_mon.enter();
active_searches[0]--;
}finally{
contacts_to_query_mon.exit();
}
search_sem.release();
}
}
};
router.recordLookup( lookup_id );
if ( value_search ){
int rem = max_values - values_found[0];
if ( rem <= 0 ){
Debug.out( "eh?" );
rem = 1;
}
closest.sendFindValue( handler, lookup_id, rem, flags );
}else{
closest.sendFindNode( handler, lookup_id );
}
}finally{
contacts_to_query_mon.exit();
}
}
// maybe unterminated searches still going on so protect ourselves
// against concurrent modification of result set
List closest_res = null;
try{
contacts_to_query_mon.enter();
if ( DHTLog.isOn()){
DHTLog.log( "lookup complete for " + DHTLog.getString( lookup_id ));
DHTLog.log( " queried = " + DHTLog.getString( contacts_queried ));
DHTLog.log( " to query = " + DHTLog.getString( contacts_to_query ));
DHTLog.log( " ok = " + DHTLog.getString( ok_contacts ));
}
closest_res = new ArrayList( ok_contacts );
// we need to reverse the list as currently closest is at
// the end
Collections.reverse( closest_res );
if ( timeout <= 0 && !value_search ){
// we can use the results of this to estimate the DHT size
estimateDHTSize( lookup_id, contacts_queried, search_accuracy );
}
}finally{
contacts_to_query_mon.exit();
}
result_handler.closest( closest_res );
}finally{
result_handler.complete( timeout_occurred );
}
}
// Request methods
public void
pingRequest(
DHTTransportContact originating_contact )
{
DHTLog.log( "pingRequest from " + DHTLog.getString( originating_contact.getID()));
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
}
public byte[]
storeRequest(
DHTTransportContact originating_contact,
byte[][] keys,
DHTTransportValue[][] value_sets )
{
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
DHTLog.log( "storeRequest from " + DHTLog.getString( originating_contact.getID())+ ", keys = " + keys.length );
byte[] diverse_res = new byte[ keys.length];
Arrays.fill( diverse_res, DHT.DT_NONE );
if ( keys.length != value_sets.length ){
Debug.out( "DHTControl:storeRequest - invalid request received from " + originating_contact.getString() + ", keys and values length mismatch");
return( diverse_res );
}
// System.out.println( "storeRequest: received " + originating_contact.getRandomID() + " from " + originating_contact.getAddress());
for (int i=0;i<keys.length;i++){
HashWrapper key = new HashWrapper( keys[i] );
DHTTransportValue[] values = value_sets[i];
DHTLog.log( " key=" + DHTLog.getString(key) + ", value=" + DHTLog.getString(values));
diverse_res[i] = database.store( originating_contact, key, values );
}
return( diverse_res );
}
public DHTTransportContact[]
findNodeRequest(
DHTTransportContact originating_contact,
byte[] id )
{
DHTLog.log( "findNodeRequest from " + DHTLog.getString( originating_contact.getID()));
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
List l;
if ( id.length == router.getID().length ){
l = getClosestKContactsList( id, false );
}else{
// this helps both protect against idiot queries and also saved bytes when we use findNode
// to just get a random ID prior to cache-forwards
l = new ArrayList();
}
final DHTTransportContact[] res = new DHTTransportContact[l.size()];
l.toArray( res );
int rand = generateSpoofID( originating_contact );
originating_contact.setRandomID( rand );
return( res );
}
public DHTTransportFindValueReply
findValueRequest(
DHTTransportContact originating_contact,
byte[] key,
int max_values,
byte flags )
{
DHTLog.log( "findValueRequest from " + DHTLog.getString( originating_contact.getID()));
DHTDBLookupResult result = database.get( originating_contact, new HashWrapper( key ), max_values, true );
if ( result != null ){
router.contactAlive( originating_contact.getID(), new DHTControlContactImpl(originating_contact));
return( new DHTTransportFindValueReplyImpl( result.getDiversificationType(), result.getValues()));
}else{
return( new DHTTransportFindValueReplyImpl( findNodeRequest( originating_contact, key )));
}
}
public DHTTransportFullStats
statsRequest(
DHTTransportContact contact )
{
return( stats );
}
protected void
requestPing(
DHTRouterContact contact )
{
((DHTControlContactImpl)contact.getAttachment()).getTransportContact().sendPing(
new DHTTransportReplyHandlerAdapter()
{
public void
pingReply(
DHTTransportContact _contact )
{
DHTLog.log( "ping OK " + DHTLog.getString( _contact ));
router.contactAlive( _contact.getID(), new DHTControlContactImpl(_contact));
}
public void
failed(
DHTTransportContact _contact,
Throwable _error )
{
DHTLog.log( "ping " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
router.contactDead( _contact.getID(), false );
}
});
}
protected void
nodeAddedToRouter(
DHTRouterContact new_contact )
{
// ignore ourselves
if ( router.isID( new_contact.getID())){
return;
}
// when a new node is added we must check to see if we need to transfer
// any of our values to it.
Map keys_to_store = new HashMap();
if ( database.isEmpty()){
// nothing to do, ping it if it isn't known to be alive
if ( !new_contact.hasBeenAlive()){
requestPing( new_contact );
}
return;
}
// see if we're one of the K closest to the new node
List closest_contacts = getClosestKContactsList( new_contact.getID(), false );
boolean close = false;
for (int i=0;i<closest_contacts.size();i++){
if ( router.isID(((DHTTransportContact)closest_contacts.get(i)).getID())){
close = true;
break;
}
}
if ( !close ){
if ( !new_contact.hasBeenAlive()){
requestPing( new_contact );
}
return;
}
// ok, we're close enough to worry about transferring values
Iterator it = database.getKeys();
while( it.hasNext()){
HashWrapper key = (HashWrapper)it.next();
byte[] encoded_key = key.getHash();
DHTDBLookupResult result = database.get( null, key, 0, false );
if ( result == null ){
// deleted in the meantime
continue;
}
// even if a result has been diversified we continue to maintain the base value set
// until the original publisher picks up the diversification (next publish period) and
// publishes to the correct place
DHTDBValue[] values = result.getValues();
List values_to_store = new ArrayList();
for (int i=0;i<values.length;i++){
DHTDBValue value = values[i];
// we don't consider any cached further away than the initial location, for transfer
// however, we *do* include ones we originate as, if we're the closest, we have to
// take responsibility for xfer (as others won't)
List sorted_contacts = getClosestKContactsList( encoded_key, false );
// if we're closest to the key, or the new node is closest and
// we're second closest, then we take responsibility for storing
// the value
boolean store_it = false;
if ( sorted_contacts.size() > 0 ){
DHTTransportContact first = (DHTTransportContact)sorted_contacts.get(0);
if ( router.isID( first.getID())){
store_it = true;
}else if ( Arrays.equals( first.getID(), new_contact.getID()) && sorted_contacts.size() > 1 ){
store_it = router.isID(((DHTTransportContact)sorted_contacts.get(1)).getID());
}
}
if ( store_it ){
values_to_store.add( value );
}
}
if ( values_to_store.size() > 0 ){
keys_to_store.put( key, values_to_store );
}
}
if ( keys_to_store.size() > 0 ){
it = keys_to_store.entrySet().iterator();
final DHTTransportContact t_contact = ((DHTControlContactImpl)new_contact.getAttachment()).getTransportContact();
final byte[][] keys = new byte[keys_to_store.size()][];
final DHTTransportValue[][] value_sets = new DHTTransportValue[keys.length][];
int index = 0;
while( it.hasNext()){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -