⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhtdbimpl.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/*
 * Created on 28-Jan-2005
 * Created by Paul Gardner
 * Copyright (C) 2004, 2005, 2006 Aelitis, All Rights Reserved.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; either version 2
 * of the License, or (at your option) any later version.
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 * 
 * AELITIS, SAS au capital de 46,603.30 euros
 * 8 Allee Lenotre, La Grille Royale, 78600 Le Mesnil le Roi, France.
 *
 */

package com.aelitis.azureus.core.dht.db.impl;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.*;

import org.gudy.azureus2.core3.ipfilter.IpFilter;
import org.gudy.azureus2.core3.ipfilter.IpFilterManagerFactory;
import org.gudy.azureus2.core3.util.AEMonitor;
import org.gudy.azureus2.core3.util.AESemaphore;
import org.gudy.azureus2.core3.util.AEThread;
import org.gudy.azureus2.core3.util.HashWrapper;
import org.gudy.azureus2.core3.util.SimpleTimer;
import org.gudy.azureus2.core3.util.SystemTime;
import org.gudy.azureus2.core3.util.Timer;
import org.gudy.azureus2.core3.util.TimerEvent;
import org.gudy.azureus2.core3.util.TimerEventPerformer;


import com.aelitis.azureus.core.dht.DHT;
import com.aelitis.azureus.core.dht.DHTLogger;
import com.aelitis.azureus.core.dht.DHTStorageAdapter;
import com.aelitis.azureus.core.dht.DHTStorageBlock;
import com.aelitis.azureus.core.dht.DHTStorageKey;
import com.aelitis.azureus.core.dht.DHTStorageKeyStats;
import com.aelitis.azureus.core.dht.db.*;
import com.aelitis.azureus.core.dht.impl.DHTLog;
import com.aelitis.azureus.core.dht.router.DHTRouter;
import com.aelitis.azureus.core.dht.control.DHTControl;
import com.aelitis.azureus.core.dht.transport.DHTTransportContact;
import com.aelitis.azureus.core.dht.transport.DHTTransportReplyHandlerAdapter;
import com.aelitis.azureus.core.dht.transport.DHTTransportValue;
import com.aelitis.azureus.core.dht.transport.udp.DHTTransportUDP;
import com.aelitis.azureus.core.util.bloom.BloomFilter;
import com.aelitis.azureus.core.util.bloom.BloomFilterFactory;

/**
 * @author parg
 *
 */

public class 
DHTDBImpl
	implements DHTDB, DHTDBStats
{	
	private int			original_republish_interval;
	
		// the grace period gives the originator time to republish their data as this could involve
		// some work on their behalf to find closest nodes etc. There's no real urgency here anyway
	
	private int			ORIGINAL_REPUBLISH_INTERVAL_GRACE	= 60*60*1000;
	
	private int			cache_republish_interval;
	
	private long		MIN_CACHE_EXPIRY_CHECK_INTERVAL		= 60000;
	private long		last_cache_expiry_check;
	
	private static final long	IP_BLOOM_FILTER_REBUILD_PERIOD		= 15*60*1000;
	private static final int	IP_COUNT_BLOOM_SIZE_INCREASE_CHUNK	= 1000;
	
	private BloomFilter	ip_count_bloom_filter = BloomFilterFactory.createAddRemove8Bit( IP_COUNT_BLOOM_SIZE_INCREASE_CHUNK );
	
	private static final int	VALUE_VERSION_CHUNK = 128;
	private int	next_value_version;
	private int next_value_version_left;
	
	
	private Map			stored_values = new HashMap();
	
	private DHTControl				control;
	private DHTStorageAdapter		adapter;
	private DHTRouter				router;
	private DHTTransportContact		local_contact;
	private DHTLogger				logger;
	
	private static final long	MAX_TOTAL_SIZE	= 4*1024*1024;
	
	private long		total_size;
	private long		total_values;
	private long		total_keys;
	
	private boolean force_original_republish;
	
	private IpFilter	ip_filter	= IpFilterManagerFactory.getSingleton().getIPFilter();

	private AEMonitor	this_mon	= new AEMonitor( "DHTDB" );

	public
	DHTDBImpl(
		DHTStorageAdapter	_adapter,
		int					_original_republish_interval,
		int					_cache_republish_interval,
		DHTLogger			_logger )
	{
		adapter							= _adapter==null?null:new adapterFacade( _adapter );
		original_republish_interval		= _original_republish_interval;
		cache_republish_interval		= _cache_republish_interval;
		logger							= _logger;
				
		
		
		SimpleTimer.addPeriodicEvent(
			"DHTDB:op",
			original_republish_interval,
			new TimerEventPerformer()
			{
				public void
				perform(
					TimerEvent	event )
				{
					logger.log( "Republish of original mappings starts" );
					
					long	start 	= SystemTime.getCurrentTime();
					
					int	stats = republishOriginalMappings();
					
					long	end 	= SystemTime.getCurrentTime();

					logger.log( "Republish of original mappings completed in " + (end-start) + ": " +
								"values = " + stats );

				}
			});
					
				// random skew here so that cache refresh isn't very synchronised, as the optimisations
				// regarding non-republising benefit from this 
			
		SimpleTimer.addPeriodicEvent(
				"DHTDB:cp",
				cache_republish_interval + 10000 - (int)(Math.random()*20000),
				new TimerEventPerformer()
				{
					public void
					perform(
						TimerEvent	event )
					{
						logger.log( "Republish of cached mappings starts" );
						
						long	start 	= SystemTime.getCurrentTime();
						
						int[]	stats = republishCachedMappings();		
						
						long	end 	= SystemTime.getCurrentTime();

						logger.log( "Republish of cached mappings completed in " + (end-start) + ": " +
									"values = " + stats[0] + ", keys = " + stats[1] + ", ops = " + stats[2]);
						
						if ( force_original_republish ){
							
							force_original_republish	= false;
							
							logger.log( "Force republish of original mappings due to router change starts" );
							
							start 	= SystemTime.getCurrentTime();
							
							int stats2 = republishOriginalMappings();
							
							end 	= SystemTime.getCurrentTime();

							logger.log( "Force republish of original mappings due to router change completed in " + (end-start) + ": " +
										"values = " + stats2 );
						}
					}
				});
		
	
		
		SimpleTimer.addPeriodicEvent(
				"DHTDB:bloom",
				IP_BLOOM_FILTER_REBUILD_PERIOD,
				new TimerEventPerformer()
				{
					public void
					perform(
						TimerEvent	event )
					{
						try{
							this_mon.enter();
							
							rebuildIPBloomFilter( false );
							
						}finally{
							
							this_mon.exit();
						}
					}
				});
						
	}
	
	
	public void
	setControl(
		DHTControl		_control )
	{
		control			= _control;
		
			// trigger an "original value republish" if router has changed
		
		force_original_republish = router != null;
		
		router			= control.getRouter();
		local_contact	= control.getTransport().getLocalContact(); 
	
			// our ID has changed - amend the originator of all our values
		
		try{
			this_mon.enter();
			
			Iterator	it = stored_values.values().iterator();
			
			while( it.hasNext()){
				
				DHTDBMapping	mapping = (DHTDBMapping)it.next();
				
				mapping.updateLocalContact( local_contact );
			}
		}finally{
			
			this_mon.exit();
		}
	}
	
	public DHTDBValue
	store(
		HashWrapper		key,
		byte[]			value,
		byte			flags )
	{
			// local store
		
		try{
			this_mon.enter();
				
				// don't police max check for locally stored data
				// only that received
			
			DHTDBMapping	mapping = (DHTDBMapping)stored_values.get( key );
			
			if ( mapping == null ){
				
				mapping = new DHTDBMapping( this, key, true );
				
				stored_values.put( key, mapping );
			}
			
			DHTDBValueImpl res =	
				new DHTDBValueImpl( 
						SystemTime.getCurrentTime(), 
						value, 
						getNextValueVersion(),
						local_contact, 
						local_contact,
						true,
						flags );
	
			mapping.add( res );
			
			return( res );
			
		}finally{
			
			this_mon.exit();
		}
	}
	
	public byte
	store(
		DHTTransportContact 	sender, 
		HashWrapper				key,
		DHTTransportValue[]		values )
	{
			// allow 4 bytes per value entry to deal with overhead (prolly should be more but we're really
			// trying to deal with 0-length value stores)
		
		if ( total_size + ( total_values*4 ) > MAX_TOTAL_SIZE ){
			
			DHTLog.log( "Not storing " + DHTLog.getString2(key.getHash()) + " as maximum storage limit exceeded" );

			return( DHT.DT_SIZE );
		}
		
			// remote store for cache values
		
			// Make sure that we only accept values for storing that are reasonable.
			// Assumption is that the caller has made a reasonable effort to ascertain
			// the correct place to store a value. Part of this will in general have 
			// needed them to query us for example. Therefore, limit values to those
			// that are at least as close to us
		
		List closest_contacts = control.getClosestKContactsList( key.getHash(), true );
		
		boolean	store_it	= false;
		
		for (int i=0;i<closest_contacts.size();i++){
			
			if ( router.isID(((DHTTransportContact)closest_contacts.get(i)).getID())){
				
				store_it	= true;
				
				break;
			}		
		}
		
		if ( !store_it ){
			
			DHTLog.log( "Not storing " + DHTLog.getString2(key.getHash()) + " as key too far away" );

			return( DHT.DT_NONE );
		}
		
			// next, for cache forwards (rather then values coming directly from 
			// originators) we ensure that the contact sending the values to us is
			// close enough. If any values are coming indirect then we can safely assume
			// that they all are
		
		boolean	cache_forward = false;
		
		for (int i=0;i<values.length;i++){
			
			if (!Arrays.equals( sender.getID(), values[i].getOriginator().getID())){
				
				cache_forward	= true;
				
				break;
			}
		}
		
		
		if ( cache_forward ){
			
				// get the closest contacts to me
				
			byte[]	my_id	= local_contact.getID();
			
			closest_contacts = control.getClosestKContactsList( my_id, true );
			
			DHTTransportContact	furthest = (DHTTransportContact)closest_contacts.get( closest_contacts.size()-1);
						
			if ( control.computeAndCompareDistances( furthest.getID(), sender.getID(), my_id ) < 0 ){

				store_it	= false;
			}
		}
		
		if ( !store_it ){
			
			DHTLog.log( "Not storing " + DHTLog.getString2(key.getHash()) + " as cache forward and sender too far away" );
			
			return( DHT.DT_NONE );
		}
		
		try{
			this_mon.enter();
						
			checkCacheExpiration( false );
				
			DHTDBMapping	mapping = (DHTDBMapping)stored_values.get( key );
			
			if ( mapping == null ){
				
				mapping = new DHTDBMapping( this, key, false );
				
				stored_values.put( key, mapping );
			}
			
			boolean contact_checked = false;
			boolean	contact_ok		= false;
			
				// we carry on an update as its ok to replace existing entries
				// even if diversified
			
			for (int i=0;i<values.length;i++){
				
				DHTTransportValue	t_value = values[i];
								
					// last check, verify that the contact is who they say they are, only for non-forwards
					// as cache forwards are only accepted if they are "close enough" and we can't 
					// rely on their identify due to the way that cache republish works (it doesn't
					// guarantee a "lookup_node" prior to "store".

				DHTTransportValue	value = values[i];
				
				boolean	ok_to_store = false;
				
				boolean	direct =Arrays.equals( sender.getID(), value.getOriginator().getID());
								
				if ( !contact_checked ){
						
					contact_ok =  control.verifyContact( sender, direct );
						
					if ( !contact_ok ){
						
						logger.log( "DB: verification of contact '" + sender.getName() + "' failed for store operation" );
					}
					
					contact_checked	= true;
				}
			
				ok_to_store	= contact_ok;

				if ( ok_to_store ){
					
					DHTDBValueImpl mapping_value	= new DHTDBValueImpl( sender, value, false );
			
					mapping.add( mapping_value );
				}
			}
			
			return( mapping.getDiversificationType());
	
		}finally{
			
			this_mon.exit();
		}
	}
	
	public DHTDBLookupResult
	get(
		DHTTransportContact		reader,
		HashWrapper				key,
		int						max_values,	// 0 -> all
		byte					flags,
		boolean					external_request )	
	{
		try{
			this_mon.enter();
			
			checkCacheExpiration( false );
					
			final DHTDBMapping mapping = (DHTDBMapping)stored_values.get(key);
			
			if ( mapping == null ){
				
				return( null );
			}
			
			if ( external_request ){
				
				mapping.addHit();
			}
			
			final DHTDBValue[]	values = mapping.get( reader, max_values, flags );
						
			return(

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -