📄 dhtcontrolimpl.java
字号:
}
}
//System.out.println( " after adding live ones = " + to_save.size());
// now add any reserve ones
for (int i=0;i<reserves.size();i++){
DHTRouterContact contact = (DHTRouterContact)reserves.get(i);
if ( !to_save.contains( contact )){
to_save.add( contact );
}
}
//System.out.println( " after adding reserves = " + to_save.size());
// now add in the rest!
for (int i=0;i<contacts.size();i++){
DHTRouterContact contact = (DHTRouterContact)contacts.get(i);
if (!to_save.contains( contact )){
to_save.add( contact );
}
}
// and finally remove the invalid ones
Iterator it = to_save.iterator();
while( it.hasNext()){
DHTRouterContact contact = (DHTRouterContact)it.next();
DHTTransportContact t_contact = ((DHTControlContactImpl)contact.getAttachment()).getTransportContact();
if ( !t_contact.isValid()){
it.remove();
}
}
//System.out.println( " finally = " + to_save.size());
int num_to_write = Math.min( max, to_save.size());
daos.writeInt( num_to_write );
for (int i=0;i<num_to_write;i++){
DHTRouterContact contact = (DHTRouterContact)to_save.get(i);
//System.out.println( "export:" + contact.getString());
daos.writeLong( contact.getTimeAlive());
DHTTransportContact t_contact = ((DHTControlContactImpl)contact.getAttachment()).getTransportContact();
try{
t_contact.exportContact( daos );
}catch( DHTTransportException e ){
// shouldn't fail as for a contact to make it to the router
// it should be valid...
Debug.printStackTrace( e );
throw( new IOException( e.getMessage()));
}
}
daos.flush();
}
public void
importState(
DataInputStream dais )
throws IOException
{
int num = dais.readInt();
for (int i=0;i<num;i++){
try{
long time_alive = dais.readLong();
DHTTransportContact contact = transport.importContact( dais );
imported_state.put( new HashWrapper( contact.getID()), new Object[]{ new Long( time_alive ), contact });
}catch( DHTTransportException e ){
Debug.printStackTrace( e );
}
}
}
public void
seed(
final boolean full_wait )
{
final AESemaphore sem = new AESemaphore( "DHTControl:seed" );
lookup( internal_lookup_pool, false,
router.getID(),
"Seeding DHT",
(byte)0,
false,
0,
search_concurrency*4,
1,
router.getK(),
new lookupResultHandler(new DHTOperationAdapter())
{
public void
diversify(
DHTTransportContact cause,
byte diversification_type )
{
}
public void
closest(
List res )
{
if ( !full_wait ){
sem.release();
}
try{
router.seed();
}finally{
if ( full_wait ){
sem.release();
}
}
}
});
// we always wait at least a minimum amount of time before returning
long start = SystemTime.getCurrentTime();
sem.reserve( INTEGRATION_TIME_MAX );
long now = SystemTime.getCurrentTime();
if ( now < start ){
start = now;
}
long remaining = INTEGRATION_TIME_MAX - ( now - start );
if ( remaining > 500 && !full_wait ){
logger.log( "Initial integration completed, waiting " + remaining + " ms for second phase to start" );
try{
Thread.sleep( remaining );
}catch( Throwable e ){
Debug.out(e);
}
}
}
protected void
poke()
{
long now = SystemTime.getCurrentTime();
if ( now < last_lookup ||
now - last_lookup > RANDOM_QUERY_PERIOD ){
last_lookup = now;
// we don't want this to be blocking as it'll stuff the stats
external_lookup_pool.run(
new task(external_lookup_pool)
{
private byte[] target = {};
public void
runSupport()
{
target = router.refreshRandom();
}
public byte[]
getTarget()
{
return( target );
}
public String
getDescription()
{
return( "Random Query" );
}
});
}
}
public void
put(
byte[] _unencoded_key,
String _description,
byte[] _value,
byte _flags,
DHTOperationListener _listener )
{
// public entry point for explicit publishes
if ( _value.length == 0 ){
// zero length denotes value removal
throw( new RuntimeException( "zero length values not supported"));
}
byte[] encoded_key = encodeKey( _unencoded_key );
DHTLog.log( "put for " + DHTLog.getString( encoded_key ));
DHTDBValue value = database.store( new HashWrapper( encoded_key ), _value, _flags );
put( external_put_pool,
encoded_key,
_description,
value,
0,
true,
new HashSet(),
_listener instanceof DHTOperationListenerDemuxer?
(DHTOperationListenerDemuxer)_listener:
new DHTOperationListenerDemuxer(_listener));
}
public void
putEncodedKey(
byte[] encoded_key,
String description,
DHTTransportValue value,
long timeout,
boolean original_mappings )
{
put( internal_put_pool,
encoded_key,
description,
value,
timeout,
original_mappings,
new HashSet(),
new DHTOperationListenerDemuxer( new DHTOperationAdapter()));
}
protected void
put(
ThreadPool thread_pool,
byte[] initial_encoded_key,
String description,
DHTTransportValue value,
long timeout,
boolean original_mappings,
Set keys_written,
DHTOperationListenerDemuxer listener )
{
put( thread_pool,
initial_encoded_key,
description,
new DHTTransportValue[]{ value },
timeout,
original_mappings,
keys_written,
listener );
}
protected void
put(
final ThreadPool thread_pool,
final byte[] initial_encoded_key,
final String description,
final DHTTransportValue[] values,
final long timeout,
final boolean original_mappings,
final Set keys_written,
final DHTOperationListenerDemuxer listener )
{
// get the initial starting point for the put - may have previously been diversified
byte[][] encoded_keys =
adapter.diversify(
null,
true,
true,
initial_encoded_key,
DHT.DT_NONE,
original_mappings );
// may be > 1 if diversification is replicating (for load balancing)
for (int i=0;i<encoded_keys.length;i++){
final byte[] encoded_key = encoded_keys[i];
HashWrapper hw = new HashWrapper( encoded_key );
if ( keys_written.contains( hw )){
// System.out.println( "put: skipping key as already written" );
continue;
}
keys_written.add( hw );
final String this_description =
Arrays.equals( encoded_key, initial_encoded_key )?
description:
("Diversification of [" + description + "]" );
lookup( thread_pool, false,
encoded_key,
this_description,
(byte)0,
false,
timeout,
search_concurrency,
1,
router.getK(),
new lookupResultHandler(listener)
{
public void
diversify(
DHTTransportContact cause,
byte diversification_type )
{
Debug.out( "Shouldn't get a diversify on a lookup-node" );
}
public void
closest(
List _closest )
{
put( thread_pool,
new byte[][]{ encoded_key },
"Store of [" + this_description + "]",
new DHTTransportValue[][]{ values },
_closest,
timeout,
listener,
true,
keys_written );
}
});
}
}
public void
putDirectEncodedKeys(
byte[][] encoded_keys,
String description,
DHTTransportValue[][] value_sets,
List contacts )
{
// we don't consider diversification for direct puts (these are for republishing
// of cached mappings and we maintain these as normal - its up to the original
// publisher to diversify as required)
put( internal_put_pool,
encoded_keys,
description,
value_sets,
contacts,
0,
new DHTOperationListenerDemuxer( new DHTOperationAdapter()),
false,
new HashSet());
}
protected void
put(
final ThreadPool thread_pool,
byte[][] initial_encoded_keys,
final String description,
final DHTTransportValue[][] initial_value_sets,
final List contacts,
final long timeout,
final DHTOperationListenerDemuxer listener,
final boolean consider_diversification,
final Set keys_written )
{
boolean[] ok = new boolean[initial_encoded_keys.length];
int failed = 0;
for (int i=0;i<initial_encoded_keys.length;i++){
if ( ! (ok[i] = !database.isKeyBlocked( initial_encoded_keys[i]))){
failed++;
}
}
// if all failed then nothing to do
if ( failed == ok.length ){
listener.incrementCompletes();
listener.complete( false );
return;
}
final byte[][] encoded_keys = failed==0?initial_encoded_keys:new byte[ok.length-failed][];
final DHTTransportValue[][] value_sets = failed==0?initial_value_sets:new DHTTransportValue[ok.length-failed][];
if ( failed > 0 ){
int pos = 0;
for (int i=0;i<ok.length;i++){
if ( ok[i] ){
encoded_keys[ pos ] = initial_encoded_keys[i];
value_sets[ pos ] = initial_value_sets[i];
pos++;
}
}
}
// only diversify on one hit as we're storing at closest 'n' so we only need to
// do it once for each key
final boolean[] diversified = new boolean[encoded_keys.length];
for (int i=0;i<contacts.size();i++){
DHTTransportContact contact = (DHTTransportContact)contacts.get(i);
if ( router.isID( contact.getID())){
// don't send to ourselves!
}else{
try{
for (int j=0;j<value_sets.length;j++){
for (int k=0;k<value_sets[j].length;k++){
listener.wrote( contact, value_sets[j][k] );
}
}
// each store is going to report its complete event
listener.incrementCompletes();
contact.sendStore(
new DHTTransportReplyHandlerAdapter()
{
public void
storeReply(
DHTTransportContact _contact,
byte[] _diversifications )
{
try{
DHTLog.log( "Store OK " + DHTLog.getString( _contact ));
router.contactAlive( _contact.getID(), new DHTControlContactImpl(_contact));
// can be null for old protocol versions
if ( consider_diversification && _diversifications != null ){
for (int j=0;j<_diversifications.length;j++){
if ( _diversifications[j] != DHT.DT_NONE && !diversified[j] ){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -