📄 dhtpluginstoragemanager.java
字号:
return( kb );
}
public DHTStorageBlock[]
getDirectKeyBlocks()
{
return( key_blocks_direct_cow );
}
public byte[]
getKeyForKeyBlock(
byte[] request )
{
if ( request.length <= 8 ){
return( new byte[0] );
}
byte[] key = new byte[ request.length - 8 ];
System.arraycopy( request, 8, key, 0, key.length );
return( key );
}
protected void
purgeDirectKeyBlocks()
{
try{
key_block_mon.enter();
ByteArrayHashMap new_map = new ByteArrayHashMap();
Iterator it = key_block_map_cow.values().iterator();
boolean changed = false;
while( it.hasNext()){
keyBlock kb = (keyBlock)it.next();
if ( kb.isDirect()){
changed = true;
}else{
new_map.put( kb.getKey(), kb );
}
}
if ( changed ){
log.log( "KB: Purged direct entries on ID change" );
key_block_map_cow = new_map;
key_blocks_direct_cow = buildKeyBlockDetails( key_block_map_cow );
writeKeyBlocks();
}
}finally{
key_block_mon.exit();
}
}
public void
setStorageForKey(
String key,
byte[] data )
{
try{
storage_mon.enter();
Map map = readMapFromFile( "general" );
map.put( key, data );
writeMapToFile( map, "general" );
}finally{
storage_mon.exit();
}
}
public byte[]
getStorageForKey(
String key )
{
try{
storage_mon.enter();
Map map = readMapFromFile( "general" );
return((byte[])map.get( key ));
}finally{
storage_mon.exit();
}
}
protected static class
keyBlock
implements DHTStorageBlock
{
private byte[] request;
private byte[] cert;
private int received;
private boolean direct;
private BloomFilter sent_to_bloom;
private boolean logged;
protected
keyBlock(
byte[] _request,
byte[] _cert,
int _received,
boolean _direct )
{
request = _request;
cert = _cert;
received = _received;
direct = _direct;
}
public byte[]
getRequest()
{
return( request );
}
public byte[]
getCertificate()
{
return( cert );
}
public byte[]
getKey()
{
byte[] key = new byte[ request.length - 8 ];
System.arraycopy( request, 8, key, 0, key.length );
return( key );
}
protected boolean
isAdd()
{
return( request[0] == 0x01 );
}
protected boolean
getLogged()
{
return( logged );
}
protected void
setLogged()
{
logged = true;
}
protected int
getCreated()
{
int created =
(request[4]<<24)&0xff000000 |
(request[5]<<16)&0x00ff0000 |
(request[6]<< 8)&0x0000ff00 |
request[7] &0x000000ff;
return( created );
}
protected int
getReceived()
{
return( received );
}
protected boolean
isDirect()
{
return( direct );
}
public boolean
hasBeenSentTo(
DHTTransportContact contact )
{
BloomFilter filter = sent_to_bloom;
if ( filter == null ){
return( false );
}
return( filter.contains( contact.getID()));
}
public void
sentTo(
DHTTransportContact contact )
{
BloomFilter filter = sent_to_bloom;
if ( filter == null || filter.getEntryCount() > 100 ){
filter = BloomFilterFactory.createAddOnly(500);
sent_to_bloom = filter;
}
filter.add( contact.getID());
}
}
protected static class
diversification
{
private DHTPluginStorageManager manager;
private HashWrapper key;
private byte type;
private long expiry;
private int[] fixed_put_offsets;
protected
diversification(
DHTPluginStorageManager _manager,
HashWrapper _key,
byte _type )
{
manager = _manager;
key = _key;
type = _type;
expiry = SystemTime.getCurrentTime() + DIV_EXPIRY_MIN + (long)(Math.random() * DIV_EXPIRY_RAND );
fixed_put_offsets = new int[DIV_FRAG_GET_SIZE];
int pos = 0;
while( pos < DIV_FRAG_GET_SIZE ){
int i = (int)(Math.random()*DIV_WIDTH);
boolean found = false;
for (int j=0;j<pos;j++){
if( i == fixed_put_offsets[j] ){
found = true;
break;
}
}
if ( !found ){
fixed_put_offsets[pos++] = i;
}
}
}
protected
diversification(
DHTPluginStorageManager _manager,
HashWrapper _key,
byte _type,
long _expiry,
int[] _fixed_put_offsets )
{
manager = _manager;
key = _key;
type = _type;
expiry = _expiry;
fixed_put_offsets = _fixed_put_offsets;
}
protected Map
serialise()
{
Map map = new HashMap();
map.put( "key", key.getBytes());
map.put( "type", new Long(type));
map.put( "exp", new Long(expiry));
List offsets = new ArrayList();
for (int i=0;i<fixed_put_offsets.length;i++){
offsets.add( new Long( fixed_put_offsets[i]));
}
map.put( "fpo", offsets );
manager.log.log( "SM: serialised div: " + DHTLog.getString2( key.getBytes()) + ", " + DHT.DT_STRINGS[type] + ", " + formatExpiry(expiry));
return( map );
}
protected static diversification
deserialise(
DHTPluginStorageManager _manager,
Map _map )
{
HashWrapper key = new HashWrapper((byte[])_map.get("key"));
int type = ((Long)_map.get("type")).intValue();
long exp = ((Long)_map.get("exp")).longValue();
List offsets = (List)_map.get("fpo");
int[] fops = new int[offsets.size()];
for (int i=0;i<fops.length;i++){
fops[i] = ((Long)offsets.get(i)).intValue();
}
_manager.log.log( "SM: deserialised div: " + DHTLog.getString2( key.getBytes()) + ", " + DHT.DT_STRINGS[type] + ", " + formatExpiry(exp));
return( new diversification( _manager, key, (byte)type, exp, fops ));
}
protected HashWrapper
getKey()
{
return( key );
}
protected long
getExpiry()
{
return( expiry );
}
protected List
getKeys(
boolean put,
boolean exhaustive )
{
List keys = new ArrayList();
if ( put ){
if ( type == DHT.DT_FREQUENCY ){
// put to all keys
for (int i=0;i<DIV_WIDTH;i++){
keys.add( diversifyKey( key, i ));
}
if ( exhaustive ){
// include original key
// System.out.println( "followDivs:put:freq adding original" );
keys.add( key );
}
}else{
// put to a fixed subset. has to be fixed else over time we'll put to
// all the fragmented locations and nullify the point of this. gets are
// randomised to we don't loose out by fixing the puts
for (int i=0;i<fixed_put_offsets.length;i++){
keys.add( diversifyKey( key, fixed_put_offsets[i]));
}
if ( exhaustive ){
// include original key
// System.out.println( "followDivs:put:size adding original" );
keys.add( key );
}
}
}else{
// get always returns a randomised selection
if ( type == DHT.DT_FREQUENCY ){
// diversification has lead to caching at all 'n' places
keys.add( diversifyKey( key,(int)(Math.random()*DIV_WIDTH)));
}else{
// diversification has fragmented across 'n' places
// select 2 to search or all if exhaustive
if ( exhaustive ){
for (int i=0;i<DIV_WIDTH;i++){
keys.add( diversifyKey( key, i ));
}
// System.out.println( "followDivs:get:size adding all" );
}else{
List randoms = new ArrayList();
while( randoms.size() < DIV_FRAG_GET_SIZE ){
Integer i = new Integer((int)(Math.random()*DIV_WIDTH));
if ( !randoms.contains(i)){
randoms.add( i );
}
}
for (int i=0;i<DIV_FRAG_GET_SIZE;i++){
keys.add( diversifyKey( key, ((Integer) randoms.get(i)).intValue()));
}
}
}
}
return( keys );
}
protected HashWrapper
diversifyKey(
HashWrapper key_in,
int offset )
{
byte[] old_bytes = key_in.getBytes();
byte[] bytes = new byte[old_bytes.length+1];
System.arraycopy( old_bytes, 0, bytes, 0, old_bytes.length );
bytes[old_bytes.length] = (byte)offset;
return( new HashWrapper( new SHA1Simple().calculateHash( bytes )));
}
}
protected static class
storageKey
implements DHTStorageKey
{
private DHTPluginStorageManager manager;
private HashWrapper key;
private byte type;
private int size;
private int entries;
private long expiry;
private long read_count_start;
private short reads_per_min;
private BloomFilter ip_bloom_filter;
protected
storageKey(
DHTPluginStorageManager _manager,
byte _type,
HashWrapper _key )
{
manager = _manager;
type = _type;
key = _key;
expiry = SystemTime.getCurrentTime() + DIV_EXPIRY_MIN + (long)(Math.random() * DIV_EXPIRY_RAND );
}
protected
storageKey(
DHTPluginStorageManager _manager,
byte _type,
HashWrapper _key,
long _expiry )
{
manager = _manager;
type = _type;
key = _key;
expiry = _expiry;
}
protected Map
serialise()
{
Map map = new HashMap();
map.put( "key", key.getBytes());
map.put( "type", new Long(type));
map.put( "exp", new Long(expiry));
manager.log.log( "SM: serialised sk: " + DHTLog.getString2( key.getBytes()) + ", " + DHT.DT_STRINGS[type] + ", " + formatExpiry(expiry) );
return( map );
}
protected static storageKey
deserialise(
DHTPluginStorageManager _manager,
Map map )
{
HashWrapper key = new HashWrapper((byte[])map.get("key"));
int type = ((Long)map.get("type")).intValue();
long exp = ((Long)map.get("exp")).longValue();
_manager.log.log( "SM: deserialised sk: " + DHTLog.getString2( key.getBytes()) + ", " + DHT.DT_STRINGS[type] + ", " + formatExpiry(exp));
return( new storageKey( _manager, (byte)type, key, exp ));
}
public void
serialiseStats(
DataOutputStream dos )
throws IOException
{
manager.serialiseStats( this, dos );
}
protected HashWrapper
getKey()
{
return( key );
}
protected long
getExpiry()
{
return( expiry );
}
public byte
getDiversificationType()
{
if ( type != DHT.DT_NONE ){
// trigger timeouts here
if ( expiry < SystemTime.getCurrentTime()){
type = DHT.DT_NONE;
manager.log.log( "SM: sk: " + DHTLog.getString2( getKey().getBytes()) + " expired" );
manager.writeDiversifications();
}
}
return( type );
}
public int
getReadsPerMinute()
{
return( reads_per_min );
}
public int
getSize()
{
return( size );
}
public int
getEntryCount()
{
return( entries );
}
protected void
read(
DHTTransportContact contact )
{
// System.out.println( "read: " + DHTLog.getString2( key.getBytes()));
if ( type == DHT.DT_NONE ){
long now = SystemTime.getCurrentTime();
long diff = now - read_count_start;
if ( diff > LOCAL_DIVERSIFICATION_READS_PER_MIN_SAMPLES*60*1000 ){
if ( ip_bloom_filter != null ){
int ip_entries = ip_bloom_filter.getEntryCount();
reads_per_min = (short)( ip_entries / LOCAL_DIVERSIFICATION_READS_PER_MIN_SAMPLES );
if ( reads_per_min == 0 && ip_entries > 0 ){
// show at least some activity!
reads_per_min = 1;
}
if ( ip_entries > LOCAL_DIVERSIFICATION_READS_PER_MIN * LOCAL_DIVERSIFICATION_READS_PER_MIN_SAMPLES ){
type = DHT.DT_FREQUENCY;
manager.log.log( "SM: sk freq created (" + ip_entries + "reads ) - " + DHTLog.getString2( key.getBytes()));
manager.writeDiversifications();
}
}
read_count_start = now;
ip_bloom_filter = null; // just null it and drop this read, doesn't matter
// and means that we don't bother creating a filter for
// infrequently accessed data
}else{
if ( ip_bloom_filter == null ){
// we want to hold enough IPs to detect a hit rate of reads_per_min*min
// with a reasonable accuracy (sized to 10/3 to save space - this gives
// an average of 100 adds required to detect 90 unique)
ip_bloom_filter = BloomFilterFactory.createAddOnly(
( LOCAL_DIVERSIFICATION_READS_PER_MIN * LOCAL_DIVERSIFICATION_READS_PER_MIN_SAMPLES *10 ) / 3 );
}
byte[] address_bytes = contact.getAddress().getAddress().getAddress();
ip_bloom_filter.add( address_bytes );
}
}
}
protected void
valueChanged(
int entries_diff,
int size_diff )
{
entries += entries_diff;
size += size_diff;
if ( entries < 0 ){
Debug.out( "entries negative" );
entries = 0;
}
if ( size < 0 ){
Debug.out( "size negative" );
size = 0;
}
if ( type == DHT.DT_NONE ){
if ( size > LOCAL_DIVERSIFICATION_SIZE_LIMIT ){
type = DHT.DT_SIZE;
manager.log.log( "SM: sk size total created (size " + size + ") - " + DHTLog.getString2( key.getBytes()));
manager.writeDiversifications();
}else if ( entries > LOCAL_DIVERSIFICATION_ENTRIES_LIMIT ){
type = DHT.DT_SIZE;
manager.log.log( "SM: sk size entries created (" + entries + " entries) - " + DHTLog.getString2( key.getBytes()));
manager.writeDiversifications();
}
}
// System.out.println( "value changed: entries = " + entries + "(" + entries_diff + "), size = " + size + "(" + size_diff + ")");
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -