⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhtpluginstoragemanager.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*
 * Created on 12-Mar-2005
 * Created by Paul Gardner
 * Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SAS au capital de 46,603.30 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */

package com.aelitis.azureus.plugins.dht.impl;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.security.KeyFactory;
import java.security.Signature;
import java.security.interfaces.RSAPublicKey;
import java.security.spec.RSAPublicKeySpec;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.gudy.azureus2.core3.util.*;

import com.aelitis.azureus.core.dht.DHT;
import com.aelitis.azureus.core.dht.DHTLogger;
import com.aelitis.azureus.core.dht.DHTStorageAdapter;
import com.aelitis.azureus.core.dht.DHTStorageBlock;
import com.aelitis.azureus.core.dht.DHTStorageKey;
import com.aelitis.azureus.core.dht.DHTStorageKeyStats;
import com.aelitis.azureus.core.dht.impl.DHTLog;
import com.aelitis.azureus.core.dht.transport.DHTTransportContact;
import com.aelitis.azureus.core.dht.transport.DHTTransportValue;
import com.aelitis.azureus.core.util.bloom.BloomFilter;
import com.aelitis.azureus.core.util.bloom.BloomFilterFactory;

/**
 * @author parg
 *
 */

public class 
DHTPluginStorageManager 
	implements DHTStorageAdapter
{
	private static final String	pub_exp = "10001";
	private static final String	modulus	= "b8a440c76405b2175a24c86d70f2c71929673a31045791d8bd84220a48729998900d227b560e88357074fa534ccccc6944729bfdda5413622f068e7926176a8afc8b75d4ba6cde760096624415b544f73677e8093ddba46723cb973b4d55f61c2003b73f52582894c018e141e8d010bb615cdbbfaeb97a7af6ce1a5a20a62994da81bde6487e8a39e66c8df0cfd9d763c2da4729cbf54278ea4912169edb0a33";
	
	private static final long		ADDRESS_EXPIRY			= 7*24*60*60*1000L; 
	private static final int		DIV_WIDTH				= 10;
	private static final int		DIV_FRAG_GET_SIZE		= 2;
	private static final long		DIV_EXPIRY_MIN			= 2*24*60*60*1000L;
	private static final long		DIV_EXPIRY_RAND			= 1*24*60*60*1000L;
	private static final long		KEY_BLOCK_TIMEOUT_SECS	= 7*24*60*60;
	
	public static final int			LOCAL_DIVERSIFICATION_SIZE_LIMIT			= 4096;
	public static final int			LOCAL_DIVERSIFICATION_ENTRIES_LIMIT			= 512;
	public static final int			LOCAL_DIVERSIFICATION_READS_PER_MIN_SAMPLES	= 3;
	public static final int			LOCAL_DIVERSIFICATION_READS_PER_MIN			= 30;
	
	public static final int			MAX_STORAGE_KEYS	= 65536;
	
	private int				network;
	private DHTLogger		log;
	private File			data_dir;
	
	private AEMonitor	address_mon	= new AEMonitor( "DHTPluginStorageManager:address" );
	private AEMonitor	contact_mon	= new AEMonitor( "DHTPluginStorageManager:contact" );
	private AEMonitor	storage_mon	= new AEMonitor( "DHTPluginStorageManager:storage" );
	private AEMonitor	version_mon	= new AEMonitor( "DHTPluginStorageManager:version" );
	private AEMonitor	key_block_mon	= new AEMonitor( "DHTPluginStorageManager:block" );
	
	private Map					version_map			= new HashMap();
	private Map					recent_addresses	= new HashMap();
	
	private Map					remote_diversifications	= new HashMap();
	private Map					local_storage_keys		= new HashMap();
	
	private volatile ByteArrayHashMap	key_block_map_cow		= new ByteArrayHashMap();
	private volatile DHTStorageBlock[]	key_blocks_direct_cow	= new DHTStorageBlock[0];
	private BloomFilter					kb_verify_fail_bloom;
	private long						kb_verify_fail_bloom_create_time;
	
	private static RSAPublicKey key_block_public_key;
	
	static{
		try{
			KeyFactory key_factory = KeyFactory.getInstance("RSA");
			
			RSAPublicKeySpec 	public_key_spec = 
				new RSAPublicKeySpec( new BigInteger(modulus,16), new BigInteger(pub_exp,16));
	
			key_block_public_key 	= (RSAPublicKey)key_factory.generatePublic( public_key_spec );

		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
		}
	}
	
	public
	DHTPluginStorageManager(
		int					_network,
		DHTLogger			_log,
		File				_data_dir )
	{	
		network		= _network;
		log			= _log;
		data_dir	= _data_dir;
		
		FileUtil.mkdirs(data_dir);
		
		readRecentAddresses();
		
		readDiversifications();
		
		readVersionData();
		
		readKeyBlocks();
	}
	
	protected void
	importContacts(
		DHT		dht )
	{
		try{
			contact_mon.enter();
						
			File	target = new File( data_dir, "contacts.dat" );

			if ( !target.exists()){
				
				target	= new File( data_dir, "contacts.saving" );
			}

			if ( target.exists()){
				
				DataInputStream	dis =  new DataInputStream( new FileInputStream( target ));
				
				try{
					
					dht.importState( dis );
					
				}finally{
											
					dis.close();
				}
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace( e );
			
		}finally{
			
			contact_mon.exit();
		}
	}
	
	protected void
	exportContacts(
		DHT		dht )
	{
		try{
			contact_mon.enter();
						
			File	saving = new File( data_dir, "contacts.saving" );
			File	target = new File( data_dir, "contacts.dat" );

			saving.delete();
			
			DataOutputStream	dos	= null;
			
			boolean	ok = false;
			
			try{
				FileOutputStream fos = new FileOutputStream( saving );
					
				dos = new DataOutputStream(fos);
					
				dht.exportState( dos, 32 );
					
				dos.flush();
					
				fos.getFD().sync();
			
				ok	= true;
				
			}finally{
				
				if ( dos != null ){
					
					dos.close();
					
					if ( ok ){
						
						target.delete();
						
						saving.renameTo( target );
					}
				}
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace( e );
			
		}finally{
			
			contact_mon.exit();
		}
		
			// this is a good point to save diversifications - useful when they've expired
			// as writing isn't triggered at expiry time 
		
		writeDiversifications();
	}
	
	protected void
	readRecentAddresses()
	{
		try{
			address_mon.enter();
			
			recent_addresses = readMapFromFile( "addresses" );
	
		}finally{
			
			address_mon.exit();
		}
	}
	
	protected void
	writeRecentAddresses()
	{
		try{
			address_mon.enter();
			
				// remove any old crud
			
			Iterator	it = recent_addresses.keySet().iterator();
			
			while( it.hasNext()){
				
				String	key = (String)it.next();
				
				if ( !key.equals( "most_recent" )){
					
					Long	time = (Long)recent_addresses.get(key);
					
					if ( SystemTime.getCurrentTime() - time.longValue() > ADDRESS_EXPIRY ){
						
						it.remove();
					}
				}
			}
			
			writeMapToFile( recent_addresses, "addresses" );
			
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
	
		}finally{
			
			address_mon.exit();
		}
	}
	
	protected void
	recordCurrentAddress(
		String		address )
	{
		try{
			address_mon.enter();
			
			recent_addresses.put( address, new Long( SystemTime.getCurrentTime()));
		
			recent_addresses.put( "most_recent", address.getBytes());
			
			writeRecentAddresses();
			
		}finally{
			
			address_mon.exit();
		}
	}
	
	protected String
	getMostRecentAddress()
	{
		byte[]	addr = (byte[])recent_addresses.get( "most_recent" );
		
		if ( addr == null ){
			
			return( null );
		}
		
		return( new String( addr ));
	}
	
	protected boolean
	isRecentAddress(
		String		address )
	{
		try{
			address_mon.enter();

			if ( recent_addresses.containsKey( address )){
				
				return( true );
			}
					
			String	most_recent = getMostRecentAddress();
			
			return( most_recent != null && most_recent.equals( address ));
			
		}finally{
			
			address_mon.exit();
		}
	}
	
	protected void
	localContactChanged(
		DHTTransportContact	contact )
	{
		purgeDirectKeyBlocks();
	}
			
	
	protected Map
	readMapFromFile(
		String		file_prefix )
	{
		try{
			File target = new File( data_dir, file_prefix + ".dat" );
			
			if ( !target.exists()){
				
				target	= new File( data_dir, file_prefix + ".saving" );
			}
			
			if ( target.exists()){
				
				BufferedInputStream	is = new BufferedInputStream( new FileInputStream( target ));
				
				try{
					return( BDecoder.decode( is ));
					
				}finally{
					
					is.close();
				}
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace( e );	
		}		
		
		return( new HashMap());
	}
	
	protected void
	writeMapToFile(
		Map			map,
		String		file_prefix )
	{
		try{
			File	saving = new File( data_dir, file_prefix + ".saving" );
			File	target = new File( data_dir, file_prefix + ".dat" );

			saving.delete();
			
			if ( map.size() == 0 ){
				
				target.delete();
				
			}else{
				
				FileOutputStream os = null;
				
				boolean	ok = false;
				
				try{
					byte[]	data = BEncoder.encode( map );
					
					os = new FileOutputStream( saving );
						
					os.write( data );
				
					os.flush();
				
					os.getFD().sync();
				
					os.close();
				
					ok	= true;
					
				}finally{
					
					if ( os != null ){
						
						os.close();
						
						if ( ok ){
							
							target.delete();
							
							saving.renameTo( target );
						}
					}
				}
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
		}
	}
	
	protected void
	readVersionData()
	{
		try{
			version_mon.enter();
			
			version_map = readMapFromFile( "version" );
	
		}finally{
			
			version_mon.exit();
		}
	}
	
	protected void
	writeVersionData()
	{
		try{
			version_mon.enter();
			
			writeMapToFile( version_map, "version" );
	
		}finally{
			
			version_mon.exit();
		}
	}
	public int
	getNextValueVersions(
		int		num )
	{
		try{
			version_mon.enter();

			Long	l_next = (Long)version_map.get( "next" );
			
			int	now = (int)(SystemTime.getCurrentTime()/1000);

			int	next;
			
			if ( l_next == null ){

				next = now;
				
			}else{
				
				next = l_next.intValue();
				
					// if "next" is in the future then we live with it to try and ensure increasing
					// values (system clock must have changed)
				
				if ( next < now ){
					
					next = now;
				}
			}
			
			version_map.put( "next", new Long( next+num ));
			
			writeVersionData();
			
			return( next );
			
		}finally{
			
			version_mon.exit();
		}
	}
	
		// key storage
	
	public DHTStorageKey
	keyCreated(
		HashWrapper		key,
		boolean			local )
	{
		//System.out.println( "DHT key created");
		
		try{
			storage_mon.enter();
		
			return(	getStorageKey( key ));
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
	public void
	keyDeleted(
		DHTStorageKey		key )
	{
		//System.out.println( "DHT key deleted" );
		
		try{
			storage_mon.enter();
		
			deleteStorageKey((storageKey)key );
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
	public void
	keyRead(
		DHTStorageKey			key,
		DHTTransportContact		contact )
	{
		//System.out.println( "DHT value read" );
		
		try{
			storage_mon.enter();
		
			((storageKey)key).read( contact );
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
	public void
	serialiseStats(
		storageKey			key,
		DataOutputStream	dos )
	
		throws IOException
	{
		dos.writeByte( (byte)0 );	// version
		dos.writeInt( key.getEntryCount());
		dos.writeInt( key.getSize());
		dos.writeInt( key.getReadsPerMinute());
		dos.writeByte( key.getDiversificationType());
	}
	
	public DHTStorageKeyStats
	deserialiseStats(
		DataInputStream			is )
	
		throws IOException
	{
		
		byte	version 	= is.readByte();
		
		final int	entry_count = is.readInt();
		final int	size		= is.readInt();
		final int	reads		= is.readInt();
		final byte	div			= is.readByte();

		return( 
			new DHTStorageKeyStats()
			{
				public int
				getEntryCount()
				{
					return( entry_count );
				}
				
				public int
				getSize()
				{
					return( size );
				}
				
				public int
				getReadsPerMinute()
				{
					return( reads );
				}
				
				public byte
				getDiversification()
				{
					return( div );
				}
			});
	}
	
	public void
	valueAdded(
		DHTStorageKey		key,
		DHTTransportValue	value )
	{
		// System.out.println( network + ": DHT value added: "  + DHTLog.getString2( ((storageKey)key).getKey().getBytes()) + " -> " + value.getString());
		
		try{
			storage_mon.enter();
		
			((storageKey)key).valueChanged( 1, value.getValue().length);
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
	public void
	valueUpdated(
		DHTStorageKey		key,
		DHTTransportValue	old_value,
		DHTTransportValue	new_value )
	{
		//System.out.println( "DHT value updated" );
		
		try{
			storage_mon.enter();
			
			((storageKey)key).valueChanged( 0, new_value.getValue().length - old_value.getValue().length);
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
	public void
	valueDeleted(
		DHTStorageKey		key,
		DHTTransportValue	value )
	{
		//System.out.println( "DHT value deleted" );
		
		try{
			storage_mon.enter();
		
			((storageKey)key).valueChanged( -1, -value.getValue().length);
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
	public boolean
	isDiversified(
		byte[]		key )
	{
		HashWrapper	wrapper = new HashWrapper( key );
		
		try{
			storage_mon.enter();
		
			return( lookupDiversification( wrapper ) != null );
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
		// get diversifications for put operations must deterministically return the same end points
		// but gets for gets should be randomised to load balance
	
	public byte[][]
	getExistingDiversification(
		byte[]			key,
		boolean			put_operation,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -