📄 dhtpluginstoragemanager.java
字号:
/*
* Created on 12-Mar-2005
* Created by Paul Gardner
* Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* AELITIS, SAS au capital de 46,603.30 euros
* 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
*
*/
package com.aelitis.azureus.plugins.dht.impl;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.security.KeyFactory;
import java.security.Signature;
import java.security.interfaces.RSAPublicKey;
import java.security.spec.RSAPublicKeySpec;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.gudy.azureus2.core3.util.*;
import com.aelitis.azureus.core.dht.DHT;
import com.aelitis.azureus.core.dht.DHTLogger;
import com.aelitis.azureus.core.dht.DHTStorageAdapter;
import com.aelitis.azureus.core.dht.DHTStorageBlock;
import com.aelitis.azureus.core.dht.DHTStorageKey;
import com.aelitis.azureus.core.dht.DHTStorageKeyStats;
import com.aelitis.azureus.core.dht.impl.DHTLog;
import com.aelitis.azureus.core.dht.transport.DHTTransportContact;
import com.aelitis.azureus.core.dht.transport.DHTTransportValue;
import com.aelitis.azureus.core.util.bloom.BloomFilter;
import com.aelitis.azureus.core.util.bloom.BloomFilterFactory;
/**
* @author parg
*
*/
public class
DHTPluginStorageManager
implements DHTStorageAdapter
{
private static final String pub_exp = "10001";
private static final String modulus = "b8a440c76405b2175a24c86d70f2c71929673a31045791d8bd84220a48729998900d227b560e88357074fa534ccccc6944729bfdda5413622f068e7926176a8afc8b75d4ba6cde760096624415b544f73677e8093ddba46723cb973b4d55f61c2003b73f52582894c018e141e8d010bb615cdbbfaeb97a7af6ce1a5a20a62994da81bde6487e8a39e66c8df0cfd9d763c2da4729cbf54278ea4912169edb0a33";
private static final long ADDRESS_EXPIRY = 7*24*60*60*1000L;
private static final int DIV_WIDTH = 10;
private static final int DIV_FRAG_GET_SIZE = 2;
private static final long DIV_EXPIRY_MIN = 2*24*60*60*1000L;
private static final long DIV_EXPIRY_RAND = 1*24*60*60*1000L;
private static final long KEY_BLOCK_TIMEOUT_SECS = 7*24*60*60;
public static final int LOCAL_DIVERSIFICATION_SIZE_LIMIT = 4096;
public static final int LOCAL_DIVERSIFICATION_ENTRIES_LIMIT = 512;
public static final int LOCAL_DIVERSIFICATION_READS_PER_MIN_SAMPLES = 3;
public static final int LOCAL_DIVERSIFICATION_READS_PER_MIN = 30;
public static final int MAX_STORAGE_KEYS = 65536;
private int network;
private DHTLogger log;
private File data_dir;
private AEMonitor address_mon = new AEMonitor( "DHTPluginStorageManager:address" );
private AEMonitor contact_mon = new AEMonitor( "DHTPluginStorageManager:contact" );
private AEMonitor storage_mon = new AEMonitor( "DHTPluginStorageManager:storage" );
private AEMonitor version_mon = new AEMonitor( "DHTPluginStorageManager:version" );
private AEMonitor key_block_mon = new AEMonitor( "DHTPluginStorageManager:block" );
private Map version_map = new HashMap();
private Map recent_addresses = new HashMap();
private Map remote_diversifications = new HashMap();
private Map local_storage_keys = new HashMap();
private volatile ByteArrayHashMap key_block_map_cow = new ByteArrayHashMap();
private volatile DHTStorageBlock[] key_blocks_direct_cow = new DHTStorageBlock[0];
private BloomFilter kb_verify_fail_bloom;
private long kb_verify_fail_bloom_create_time;
private static RSAPublicKey key_block_public_key;
static{
try{
KeyFactory key_factory = KeyFactory.getInstance("RSA");
RSAPublicKeySpec public_key_spec =
new RSAPublicKeySpec( new BigInteger(modulus,16), new BigInteger(pub_exp,16));
key_block_public_key = (RSAPublicKey)key_factory.generatePublic( public_key_spec );
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
public
DHTPluginStorageManager(
int _network,
DHTLogger _log,
File _data_dir )
{
network = _network;
log = _log;
data_dir = _data_dir;
FileUtil.mkdirs(data_dir);
readRecentAddresses();
readDiversifications();
readVersionData();
readKeyBlocks();
}
protected void
importContacts(
DHT dht )
{
try{
contact_mon.enter();
File target = new File( data_dir, "contacts.dat" );
if ( !target.exists()){
target = new File( data_dir, "contacts.saving" );
}
if ( target.exists()){
DataInputStream dis = new DataInputStream( new FileInputStream( target ));
try{
dht.importState( dis );
}finally{
dis.close();
}
}
}catch( Throwable e ){
Debug.printStackTrace( e );
}finally{
contact_mon.exit();
}
}
protected void
exportContacts(
DHT dht )
{
try{
contact_mon.enter();
File saving = new File( data_dir, "contacts.saving" );
File target = new File( data_dir, "contacts.dat" );
saving.delete();
DataOutputStream dos = null;
boolean ok = false;
try{
FileOutputStream fos = new FileOutputStream( saving );
dos = new DataOutputStream(fos);
dht.exportState( dos, 32 );
dos.flush();
fos.getFD().sync();
ok = true;
}finally{
if ( dos != null ){
dos.close();
if ( ok ){
target.delete();
saving.renameTo( target );
}
}
}
}catch( Throwable e ){
Debug.printStackTrace( e );
}finally{
contact_mon.exit();
}
// this is a good point to save diversifications - useful when they've expired
// as writing isn't triggered at expiry time
writeDiversifications();
}
protected void
readRecentAddresses()
{
try{
address_mon.enter();
recent_addresses = readMapFromFile( "addresses" );
}finally{
address_mon.exit();
}
}
protected void
writeRecentAddresses()
{
try{
address_mon.enter();
// remove any old crud
Iterator it = recent_addresses.keySet().iterator();
while( it.hasNext()){
String key = (String)it.next();
if ( !key.equals( "most_recent" )){
Long time = (Long)recent_addresses.get(key);
if ( SystemTime.getCurrentTime() - time.longValue() > ADDRESS_EXPIRY ){
it.remove();
}
}
}
writeMapToFile( recent_addresses, "addresses" );
}catch( Throwable e ){
Debug.printStackTrace(e);
}finally{
address_mon.exit();
}
}
protected void
recordCurrentAddress(
String address )
{
try{
address_mon.enter();
recent_addresses.put( address, new Long( SystemTime.getCurrentTime()));
recent_addresses.put( "most_recent", address.getBytes());
writeRecentAddresses();
}finally{
address_mon.exit();
}
}
protected String
getMostRecentAddress()
{
byte[] addr = (byte[])recent_addresses.get( "most_recent" );
if ( addr == null ){
return( null );
}
return( new String( addr ));
}
protected boolean
isRecentAddress(
String address )
{
try{
address_mon.enter();
if ( recent_addresses.containsKey( address )){
return( true );
}
String most_recent = getMostRecentAddress();
return( most_recent != null && most_recent.equals( address ));
}finally{
address_mon.exit();
}
}
protected void
localContactChanged(
DHTTransportContact contact )
{
purgeDirectKeyBlocks();
}
protected Map
readMapFromFile(
String file_prefix )
{
try{
File target = new File( data_dir, file_prefix + ".dat" );
if ( !target.exists()){
target = new File( data_dir, file_prefix + ".saving" );
}
if ( target.exists()){
BufferedInputStream is = new BufferedInputStream( new FileInputStream( target ));
try{
return( BDecoder.decode( is ));
}finally{
is.close();
}
}
}catch( Throwable e ){
Debug.printStackTrace( e );
}
return( new HashMap());
}
protected void
writeMapToFile(
Map map,
String file_prefix )
{
try{
File saving = new File( data_dir, file_prefix + ".saving" );
File target = new File( data_dir, file_prefix + ".dat" );
saving.delete();
if ( map.size() == 0 ){
target.delete();
}else{
FileOutputStream os = null;
boolean ok = false;
try{
byte[] data = BEncoder.encode( map );
os = new FileOutputStream( saving );
os.write( data );
os.flush();
os.getFD().sync();
os.close();
ok = true;
}finally{
if ( os != null ){
os.close();
if ( ok ){
target.delete();
saving.renameTo( target );
}
}
}
}
}catch( Throwable e ){
Debug.printStackTrace(e);
}
}
protected void
readVersionData()
{
try{
version_mon.enter();
version_map = readMapFromFile( "version" );
}finally{
version_mon.exit();
}
}
protected void
writeVersionData()
{
try{
version_mon.enter();
writeMapToFile( version_map, "version" );
}finally{
version_mon.exit();
}
}
public int
getNextValueVersions(
int num )
{
try{
version_mon.enter();
Long l_next = (Long)version_map.get( "next" );
int now = (int)(SystemTime.getCurrentTime()/1000);
int next;
if ( l_next == null ){
next = now;
}else{
next = l_next.intValue();
// if "next" is in the future then we live with it to try and ensure increasing
// values (system clock must have changed)
if ( next < now ){
next = now;
}
}
version_map.put( "next", new Long( next+num ));
writeVersionData();
return( next );
}finally{
version_mon.exit();
}
}
// key storage
public DHTStorageKey
keyCreated(
HashWrapper key,
boolean local )
{
//System.out.println( "DHT key created");
try{
storage_mon.enter();
return( getStorageKey( key ));
}finally{
storage_mon.exit();
}
}
public void
keyDeleted(
DHTStorageKey key )
{
//System.out.println( "DHT key deleted" );
try{
storage_mon.enter();
deleteStorageKey((storageKey)key );
}finally{
storage_mon.exit();
}
}
public void
keyRead(
DHTStorageKey key,
DHTTransportContact contact )
{
//System.out.println( "DHT value read" );
try{
storage_mon.enter();
((storageKey)key).read( contact );
}finally{
storage_mon.exit();
}
}
public void
serialiseStats(
storageKey key,
DataOutputStream dos )
throws IOException
{
dos.writeByte( (byte)0 ); // version
dos.writeInt( key.getEntryCount());
dos.writeInt( key.getSize());
dos.writeInt( key.getReadsPerMinute());
dos.writeByte( key.getDiversificationType());
}
public DHTStorageKeyStats
deserialiseStats(
DataInputStream is )
throws IOException
{
byte version = is.readByte();
final int entry_count = is.readInt();
final int size = is.readInt();
final int reads = is.readInt();
final byte div = is.readByte();
return(
new DHTStorageKeyStats()
{
public int
getEntryCount()
{
return( entry_count );
}
public int
getSize()
{
return( size );
}
public int
getReadsPerMinute()
{
return( reads );
}
public byte
getDiversification()
{
return( div );
}
});
}
public void
valueAdded(
DHTStorageKey key,
DHTTransportValue value )
{
// System.out.println( network + ": DHT value added: " + DHTLog.getString2( ((storageKey)key).getKey().getBytes()) + " -> " + value.getString());
try{
storage_mon.enter();
((storageKey)key).valueChanged( 1, value.getValue().length);
}finally{
storage_mon.exit();
}
}
public void
valueUpdated(
DHTStorageKey key,
DHTTransportValue old_value,
DHTTransportValue new_value )
{
//System.out.println( "DHT value updated" );
try{
storage_mon.enter();
((storageKey)key).valueChanged( 0, new_value.getValue().length - old_value.getValue().length);
}finally{
storage_mon.exit();
}
}
public void
valueDeleted(
DHTStorageKey key,
DHTTransportValue value )
{
//System.out.println( "DHT value deleted" );
try{
storage_mon.enter();
((storageKey)key).valueChanged( -1, -value.getValue().length);
}finally{
storage_mon.exit();
}
}
public boolean
isDiversified(
byte[] key )
{
HashWrapper wrapper = new HashWrapper( key );
try{
storage_mon.enter();
return( lookupDiversification( wrapper ) != null );
}finally{
storage_mon.exit();
}
}
// get diversifications for put operations must deterministically return the same end points
// but gets for gets should be randomised to load balance
public byte[][]
getExistingDiversification(
byte[] key,
boolean put_operation,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -