⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhtpluginstoragemanager.java

📁 基于JXTA开发平台的下载软件开发源代码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * Created on 12-Mar-2005
 * Created by Paul Gardner
 * Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SAS au capital de 46,603.30 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */

package com.aelitis.azureus.plugins.dht.impl;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.BDecoder;
import org.gudy.azureus2.core3.util.BEncoder;
import org.gudy.azureus2.core3.util.Debug;
import org.gudy.azureus2.core3.util.DisplayFormatters;
import org.gudy.azureus2.core3.util.HashWrapper;
import org.gudy.azureus2.core3.util.SHA1Simple;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.plugins.logging.LoggerChannel;

import com.aelitis.azureus.core.dht.DHT;
import com.aelitis.azureus.core.dht.DHTLogger;
import com.aelitis.azureus.core.dht.DHTStorageAdapter;
import com.aelitis.azureus.core.dht.DHTStorageKey;
import com.aelitis.azureus.core.dht.impl.DHTLog;
import com.aelitis.azureus.core.dht.transport.DHTTransportContact;
import com.aelitis.azureus.core.dht.transport.DHTTransportValue;
import com.aelitis.azureus.core.util.bloom.BloomFilter;
import com.aelitis.azureus.core.util.bloom.BloomFilterFactory;

/**
 * @author parg
 *
 */

public class 
DHTPluginStorageManager 
	implements DHTStorageAdapter
{
	private static final long		ADDRESS_EXPIRY			= 7*24*60*60*1000; 
	private static final int		DIV_WIDTH				= 10;
	private static final int		DIV_FRAG_GET_SIZE		= 2;
	private static final long		DIV_EXPIRY_MIN			= 2*24*60*60*1000;
	private static final long		DIV_EXPIRY_RAND			= 1*24*60*60*1000;
		
	
	public static final int			LOCAL_DIVERSIFICATION_SIZE_LIMIT			= 4096;
	public static final int			LOCAL_DIVERSIFICATION_ENTRIES_LIMIT			= 512;
	public static final int			LOCAL_DIVERSIFICATION_READS_PER_MIN_SAMPLES	= 3;
	public static final int			LOCAL_DIVERSIFICATION_READS_PER_MIN			= 30;
	
	public static final int			MAX_STORAGE_KEYS	= 65536;
	
	private DHTLogger		log;
	private File			data_dir;
	
	private AEMonitor	address_mon	= new AEMonitor( "DHTPluginStorageManager:address" );
	private AEMonitor	contact_mon	= new AEMonitor( "DHTPluginStorageManager:contact" );
	private AEMonitor	storage_mon	= new AEMonitor( "DHTPluginStorageManager:storage" );
	private AEMonitor	version_mon	= new AEMonitor( "DHTPluginStorageManager:version" );
	
	private Map					version_map			= new HashMap();
	private Map					recent_addresses	= new HashMap();
	
	private Map					remote_diversifications	= new HashMap();
	private Map					local_storage_keys		= new HashMap();
	

	public
	DHTPluginStorageManager(
		DHTLogger			_log,
		File				_data_dir )
	{
		log			= _log;
		data_dir	= _data_dir;
		
		data_dir.mkdirs();
		
		readRecentAddresses();
		
		readDiversifications();
		
		readVersionData();
	}
	
	protected void
	importContacts(
		DHT		dht )
	{
		try{
			contact_mon.enter();
						
			File	target = new File( data_dir, "contacts.dat" );

			if ( !target.exists()){
				
				target	= new File( data_dir, "contacts.saving" );
			}

			if ( target.exists()){
				
				DataInputStream	dis =  new DataInputStream( new FileInputStream( target ));
				
				try{
					
					dht.importState( dis );
					
				}finally{
											
					dis.close();
				}
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace( e );
			
		}finally{
			
			contact_mon.exit();
		}
	}
	
	protected void
	exportContacts(
		DHT		dht )
	{
		try{
			contact_mon.enter();
						
			File	saving = new File( data_dir, "contacts.saving" );
			File	target = new File( data_dir, "contacts.dat" );

			saving.delete();
			
			DataOutputStream	dos	= null;
			
			boolean	ok = false;
			
			try{
				dos = new DataOutputStream( new FileOutputStream( saving ));
					
				dht.exportState( dos, 32 );
			
				ok	= true;
				
			}finally{
				
				if ( dos != null ){
					
					dos.close();
					
					if ( ok ){
						
						target.delete();
						
						saving.renameTo( target );
					}
				}
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace( e );
			
		}finally{
			
			contact_mon.exit();
		}
		
			// this is a good point to save diversifications - useful when they've expired
			// as writing isn't triggered at expiry time 
		
		writeDiversifications();
	}
	
	protected void
	readRecentAddresses()
	{
		try{
			address_mon.enter();
			
			recent_addresses = readMapFromFile( "addresses" );
	
		}finally{
			
			address_mon.exit();
		}
	}
	
	protected void
	writeRecentAddresses()
	{
		try{
			address_mon.enter();
			
				// remove any old crud
			
			Iterator	it = recent_addresses.keySet().iterator();
			
			while( it.hasNext()){
				
				String	key = (String)it.next();
				
				if ( !key.equals( "most_recent" )){
					
					Long	time = (Long)recent_addresses.get(key);
					
					if ( SystemTime.getCurrentTime() - time.longValue() > ADDRESS_EXPIRY ){
						
						it.remove();
					}
				}
			}
			
			writeMapToFile( recent_addresses, "addresses" );
			
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
	
		}finally{
			
			address_mon.exit();
		}
	}
	
	protected void
	recordCurrentAddress(
		String		address )
	{
		try{
			address_mon.enter();

			recent_addresses.put( address, new Long( SystemTime.getCurrentTime()));
		
			recent_addresses.put( "most_recent", address.getBytes());
			
			writeRecentAddresses();
			
		}finally{
			
			address_mon.exit();
		}
	}
	
	protected String
	getMostRecentAddress()
	{
		byte[]	addr = (byte[])recent_addresses.get( "most_recent" );
		
		if ( addr == null ){
			
			return( null );
		}
		
		return( new String( addr ));
	}
	
	protected boolean
	isRecentAddress(
		String		address )
	{
		try{
			address_mon.enter();

			return( recent_addresses.containsKey( address ));
					
		}finally{
			
			address_mon.exit();
		}
	}
	
	
	protected Map
	readMapFromFile(
		String		file_prefix )
	{
		try{
			File target = new File( data_dir, file_prefix + ".dat" );
			
			if ( !target.exists()){
				
				target	= new File( data_dir, file_prefix + ".saving" );
			}
			
			if ( target.exists()){
				
				BufferedInputStream	is = new BufferedInputStream( new FileInputStream( target ));
				
				try{
					return( BDecoder.decode( is ));
					
				}finally{
					
					is.close();
				}
			}
		}catch( Throwable e ){
			
			Debug.printStackTrace( e );	
		}		
		
		return( new HashMap());
	}
	
	protected void
	writeMapToFile(
		Map			map,
		String		file_prefix )
	{
		try{
			File	saving = new File( data_dir, file_prefix + ".saving" );
			File	target = new File( data_dir, file_prefix + ".dat" );

			saving.delete();
			
			FileOutputStream os = null;
			
			boolean	ok = false;
			
			try{
				byte[]	data = BEncoder.encode( map );
				
				os = new FileOutputStream( saving );
					
				os.write( data );
			
				os.close();
			
				ok	= true;
				
			}finally{
				
				if ( os != null ){
					
					os.close();
					
					if ( ok ){
						
						target.delete();
						
						saving.renameTo( target );
					}
				}
			}
			
		}catch( Throwable e ){
			
			Debug.printStackTrace(e);
		}
	}
	
	protected void
	readVersionData()
	{
		try{
			version_mon.enter();
			
			version_map = readMapFromFile( "version" );
	
		}finally{
			
			version_mon.exit();
		}
	}
	
	protected void
	writeVersionData()
	{
		try{
			version_mon.enter();
			
			writeMapToFile( version_map, "version" );
	
		}finally{
			
			version_mon.exit();
		}
	}
	public int
	getNextValueVersions(
		int		num )
	{
		try{
			version_mon.enter();

			Long	l_next = (Long)version_map.get( "next" );
			
			int	now = (int)(SystemTime.getCurrentTime()/1000);

			int	next;
			
			if ( l_next == null ){

				next = now;
				
			}else{
				
				next = l_next.intValue();
				
					// if "next" is in the future then we live with it to try and ensure increasing
					// values (system clock must have changed)
				
				if ( next < now ){
					
					next = now;
				}
			}
			
			version_map.put( "next", new Long( next+num ));
			
			writeVersionData();
			
			return( next );
			
		}finally{
			
			version_mon.exit();
		}
	}
	
		// key storage
	
	public DHTStorageKey
	keyCreated(
		HashWrapper		key,
		boolean			local )
	{
		//System.out.println( "DHT key created");
		
		try{
			storage_mon.enter();
		
			return(	getStorageKey( key ));
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
	public void
	keyDeleted(
		DHTStorageKey		key )
	{
		//System.out.println( "DHT key deleted" );
		
		try{
			storage_mon.enter();
		
			deleteStorageKey((storageKey)key );
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
	public void
	keyRead(
		DHTStorageKey			key,
		DHTTransportContact		contact )
	{
		//System.out.println( "DHT value read" );
		
		try{
			storage_mon.enter();
		
			((storageKey)key).read( contact );
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
	public void
	valueAdded(
		DHTStorageKey		key,
		DHTTransportValue	value )
	{
		//System.out.println( "DHT value added" );
		
		try{
			storage_mon.enter();
		
			((storageKey)key).valueChanged( 1, value.getValue().length);
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
	public void
	valueUpdated(
		DHTStorageKey		key,
		DHTTransportValue	old_value,
		DHTTransportValue	new_value )
	{
		//System.out.println( "DHT value updated" );
		
		try{
			storage_mon.enter();
			
			((storageKey)key).valueChanged( 0, new_value.getValue().length - old_value.getValue().length);
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
	public void
	valueDeleted(
		DHTStorageKey		key,
		DHTTransportValue	value )
	{
		//System.out.println( "DHT value deleted" );
		
		try{
			storage_mon.enter();
		
			((storageKey)key).valueChanged( -1, -value.getValue().length);
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
		// get diversifications for put operations must deterministically return the same end points
		// but gets for gets should be randomised to load balance
	
	public byte[][]
	getExistingDiversification(
		byte[]			key,
		boolean			put_operation,
		boolean			exhaustive )
	{
		//System.out.println( "DHT get existing diversification: put = " + put_operation  );
		
		HashWrapper	wrapper = new HashWrapper( key );
		
			// must always return a value - original if no diversification exists
		
		try{
			storage_mon.enter();
		
			byte[][]	res = followDivChain( wrapper, put_operation, exhaustive );
			
			if ( !Arrays.equals( res[0], key )){
				
				String	trace = "";
				
				for (int i=0;i<res.length;i++){
					trace += (i==0?"":",") + DHTLog.getString2( res[i] );
				}
				
				log.log( "SM: get div: " + DHTLog.getString2(key) + ", put = " + put_operation + ", exh = " + exhaustive + " -> " + trace );
			}
			
			return( res );
			
		}finally{
			
			storage_mon.exit();
		}
	}
	
	public byte[][]
	createNewDiversification(
		DHTTransportContact	cause,
		byte[]				key,
		boolean				put_operation,
		byte				diversification_type,
		boolean				exhaustive )
	{
		//System.out.println( "DHT create new diversification: put = " + put_operation +", type = " + diversification_type );
		
		HashWrapper	wrapper = new HashWrapper( key );
		
		try{
			storage_mon.enter();
		
			diversification	div = lookupDiversification( wrapper );
		
			boolean	created = false;
			
			if ( div == null ){
				
				div = createDiversification( wrapper, diversification_type );
				
				created	= true;			
			}
		
			byte[][] res = followDivChain( wrapper, put_operation, exhaustive );
		
			String	trace = "";
			
			for (int i=0;i<res.length;i++){
				
				trace += (i==0?"":",") + DHTLog.getString2( res[i] );
			}
			
			log.log( "SM: create div: " + DHTLog.getString2(key) + 
						", new = " + created + ", put = " + put_operation + 
						", exh = " + exhaustive + 
						", type = " + DHT.DT_STRINGS[diversification_type] + " -> " + trace +
						", cause = " + (cause==null?"<unknown>":cause.getString()));
			

			return( res );
			
		}finally{
			
			storage_mon.exit();
		}
	} 
	
	protected byte[][]
	followDivChain(
		HashWrapper	wrapper,
		boolean		put_operation,
		boolean		exhaustive )
	{
		List	list = new ArrayList();
		
		list.add( wrapper );
		
		list	= followDivChain( list, put_operation, 0, exhaustive, new ArrayList());
		
		byte[][]	res = new byte[list.size()][];
		
		for (int i=0;i<list.size();i++){
			
			res[i] = ((HashWrapper)list.get(i)).getBytes();
		}
		
		return( res );
	}
	
	protected List
	followDivChain(
		List		list_in,
		boolean		put_operation,
		int			depth,
		boolean		exhaustive,
		List		keys_done )
	{
		List	list_out = new ArrayList();
	
		/*
		String	indent = "";
		for(int i=0;i<depth;i++){
			indent+= "  ";
		}
		System.out.println( indent + "->" );
		*/

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -