dhtdbimpl.java

来自「这是一个基于java编写的torrent的P2P源码」· Java 代码 · 共 1,866 行 · 第 1/4 页

JAVA
1,866
字号
	
	protected void
	incrementValueAdds(
		DHTTransportContact	contact )
	{
			// assume a node stores 1000 values at 20 (K) locations -> 20,000 values
			// assume a DHT size of 100,000 nodes
			// that is, on average, 1 value per 5 nodes
			// assume NAT of up to 30 ports per address
			// this gives 6 values per address
			// with a factor of 10 error this is still only 60 per address
		
		int	hit_count = ip_count_bloom_filter.add( contact.getAddress().getAddress().getAddress());
		
		if ( DHTLog.GLOBAL_BLOOM_TRACE ){
		
			System.out.println( "direct add from " + contact.getAddress() + ", hit count = " + hit_count );
		}

			// allow up to 10% bloom filter utilisation
		
		if ( ip_count_bloom_filter.getSize() / ip_count_bloom_filter.getEntryCount() < 10 ){
			
			rebuildIPBloomFilter( true );
		}
		
		if ( hit_count > 64 ){
			
			// obviously being spammed, drop all data originated by this IP and ban it
			
			banContact( contact, "global flood" );
		}
	}
	
	protected void
	decrementValueAdds(
		DHTTransportContact	contact )
	{
		int	hit_count = ip_count_bloom_filter.remove( contact.getAddress().getAddress().getAddress());

		if ( DHTLog.GLOBAL_BLOOM_TRACE ){
			
			System.out.println( "direct remove from " + contact.getAddress() + ", hit count = " + hit_count );
		}
	}

	protected void
	rebuildIPBloomFilter(
		boolean	increase_size )
	{
		BloomFilter	new_filter;
		
		if ( increase_size ){
			
			new_filter = BloomFilterFactory.createAddRemove8Bit( ip_count_bloom_filter.getSize() + IP_COUNT_BLOOM_SIZE_INCREASE_CHUNK );
			
		}else{
			
			new_filter = BloomFilterFactory.createAddRemove8Bit( ip_count_bloom_filter.getSize());
			
		}
		
		try{
			
			//Map		sender_map	= new HashMap();
			//List	senders		= new ArrayList();
			
			Iterator	it = stored_values.values().iterator();
			
			int	max_hits = 0;
			
			while( it.hasNext()){
				
				DHTDBMapping	mapping = (DHTDBMapping)it.next();

				mapping.rebuildIPBloomFilter( false );
				
				Iterator	it2 = mapping.getDirectValues();
				
				while( it2.hasNext()){
					
					DHTDBValueImpl	val = (DHTDBValueImpl)it2.next();
					
					if ( !val.isLocal()){
						
						// logger.log( "    adding " + val.getOriginator().getAddress());
						
						int	hits = new_filter.add( val.getOriginator().getAddress().getAddress().getAddress());
						
						if ( hits > max_hits ){
							
							max_hits = hits;
						}
					}
				}
				
					// survey our neighbourhood
				
				/*
				 * its is non-trivial to do anything about nodes that get "close" to us and then
				 * spam us with crap. Ultimately, of course, to take a key out you "just" create
				 * the 20 closest nodes to the key and then run nodes that swallow all registrations
				 * and return nothing.  
				 * Protecting against one or two such nodes that flood crap requires crap to be
				 * identified. Tracing shows a large disparity between number of values registered
				 * per neighbour (factors of 100), so an approach based on number of registrations
				 * is non-trivial (assuming future scaling of the DHT, what do we consider crap?)
				 * A further approach would be to query the claimed originators of values (obviously
				 * a low bandwith approach, e.g. query 3 values from the contact with highest number
				 * of forwarded values). This requires originators to support long term knowledge of
				 * what they've published (we don't want to blacklist a neighbour because an originator
				 * has deleted a value/been restarted). We also then have to consider how to deal with
				 * non-responses to queries (assuming an affirmative Yes -> value has been forwarded
				 * correnctly, No -> probably crap). We can't treat non-replies as No. Thus a bad
				 * neighbour only has to forward crap with originators that aren't AZ nodes (very
				 * easy to do!) to break this aproach. 
				 * 
				 * 
				it2 = mapping.getIndirectValues();
				
				while( it2.hasNext()){
					
					DHTDBValueImpl	val = (DHTDBValueImpl)it2.next();
					
					DHTTransportContact sender = val.getSender();
					
					HashWrapper	hw = new HashWrapper( sender.getID());
					
					Integer	sender_count = (Integer)sender_map.get( hw );
					
					if ( sender_count == null ){
						
						sender_count = new Integer(1);
						
						senders.add( sender );
						
					}else{
						
						sender_count = new Integer( sender_count.intValue() + 1 );						
					}
					
					sender_map.put( hw, sender_count );
				}	
				*/
			}
			
			logger.log( "Rebuilt global IP bloom filter, size = " + new_filter.getSize() + ", entries =" + new_filter.getEntryCount()+", max hits = " + max_hits );
				
			/*
			senders = control.sortContactsByDistance( senders );
			
			for (int i=0;i<senders.size();i++){
				
				DHTTransportContact	sender = (DHTTransportContact)senders.get(i);
				
				System.out.println( i + ":" + sender.getString() + " -> " + sender_map.get(new HashWrapper(sender.getID())));	
			}
			*/
			
		}finally{
			
			ip_count_bloom_filter	= new_filter;
		}
	}
	
	protected void
	reportSizes(
		String	op )
	{
		/*
		if ( !this_mon.isHeld()){
			
			Debug.out( "Monitor not held" );
		}
		
		int	actual_keys 	= stored_values.size();
		int	actual_values 	= 0;
		int actual_size		= 0;
		
		Iterator it = stored_values.values().iterator();
		
		while( it.hasNext()){
		
			DHTDBMapping	mapping = (DHTDBMapping)it.next();
			
			int	reported_size = mapping.getLocalSize() + mapping.getDirectSize() + mapping.getIndirectSize();
			
			actual_values += mapping.getValueCount();
			
			Iterator	it2 = mapping.getValues();
			
			int	sz = 0;
			
			while( it2.hasNext()){
				
				DHTDBValue	val = (DHTDBValue)it2.next();
				
				sz += val.getValue().length;
			}
			
			if ( sz != reported_size ){
				
				Debug.out( "Reported mapping size != actual: " + reported_size + "/" + sz );
			}
			
			actual_size += sz;
		}
		
		if ( actual_keys != total_keys ){
			
			Debug.out( "Actual keys != total: " + actual_keys + "/" + total_keys );
		}
		
		if ( actual_values != total_values ){
			
			Debug.out( "Actual values != total: " + actual_values + "/" + total_values );
		}
		
		if ( actual_size != total_size ){
			
			Debug.out( "Actual size != total: " + actual_size + "/" + total_size );
		}
		
		System.out.println( "DHTDB: " + op + " - keys=" + total_keys + ", values=" + total_values + ", size=" + total_size );
		*/
	}
	
	protected int
	getNextValueVersion()
	{
		try{
			this_mon.enter();
			
			if ( next_value_version_left == 0 ){
				
				next_value_version_left = VALUE_VERSION_CHUNK;
				
				if ( adapter == null ){
					
						// no persistent manager, just carry on incrementing
					
				}else{
					
					next_value_version = adapter.getNextValueVersions( VALUE_VERSION_CHUNK );
				}
				
				//System.out.println( "next chunk:" + next_value_version );
			}
			
			next_value_version_left--;
			
			int	res = next_value_version++;
			
			//System.out.println( "next value version = " + res );
			
			return( res  );
			
		}finally{
			
			this_mon.exit();
		}
	}
	
	protected class
	adapterFacade
		implements DHTStorageAdapter
	{
		private DHTStorageAdapter		delegate;
		
		protected
		adapterFacade(
			DHTStorageAdapter	_delegate )
		{
			delegate = _delegate;
		}
		
		public DHTStorageKey
		keyCreated(
			HashWrapper		key,
			boolean			local )
		{
				// report *before* incrementing as this occurs before the key is locally added
			
			reportSizes( "keyAdded" );
			
			total_keys++;
			
			return( delegate.keyCreated( key, local ));
		}
		
		public void
		keyDeleted(
			DHTStorageKey	adapter_key )
		{
			total_keys--;
			
			reportSizes( "keyDeleted" );
			
			delegate.keyDeleted( adapter_key );
		}
		
		public void
		keyRead(
			DHTStorageKey			adapter_key,
			DHTTransportContact		contact )
		{
			reportSizes( "keyRead" );
			
			delegate.keyRead( adapter_key, contact );
		}
		
		public DHTStorageKeyStats
		deserialiseStats(
			DataInputStream			is )
		
			throws IOException
		{
			return( delegate.deserialiseStats( is ));
		}
		
		public void
		valueAdded(
			DHTStorageKey		key,
			DHTTransportValue	value )
		{
			total_values++;
			total_size += value.getValue().length;
			
			reportSizes( "valueAdded");
			
			if ( !value.isLocal() ){
				
				DHTDBValueImpl	val = (DHTDBValueImpl)value;
				
				boolean	direct = Arrays.equals( value.getOriginator().getID(), val.getSender().getID());
				
				if ( direct ){
					
					incrementValueAdds( value.getOriginator());
				}
			}
				
			delegate.valueAdded( key, value );
		}
		
		public void
		valueUpdated(
			DHTStorageKey		key,
			DHTTransportValue	old_value,
			DHTTransportValue	new_value )
		{
			total_size += (new_value.getValue().length - old_value.getValue().length );
			
			reportSizes("valueUpdated");
			
			delegate.valueUpdated( key, old_value, new_value );
		}
		
		public void
		valueDeleted(
			DHTStorageKey		key,
			DHTTransportValue	value )
		{
			total_values--;
			total_size -= value.getValue().length;
		
			reportSizes("valueDeleted");
			
			if ( !value.isLocal() ){
				
				DHTDBValueImpl	val = (DHTDBValueImpl)value;
				
				boolean	direct = Arrays.equals( value.getOriginator().getID(), val.getSender().getID());
				
				if ( direct ){
					
					decrementValueAdds( value.getOriginator());
				}
			}

			delegate.valueDeleted( key, value );
		}
		
			// local lookup/put operations
		
		public boolean
		isDiversified(
			byte[]		key )
		{
			return( delegate.isDiversified( key ));
		}
		
		public byte[][]
		getExistingDiversification(
			byte[]			key,
			boolean			put_operation,
			boolean			exhaustive_get )
		{
			return( delegate.getExistingDiversification( key, put_operation, exhaustive_get ));
		}
		
		public byte[][]
		createNewDiversification(
			DHTTransportContact	cause,
			byte[]				key,
			boolean				put_operation,
			byte				diversification_type,
			boolean				exhaustive_get )
		{
			return( delegate.createNewDiversification( cause, key, put_operation, diversification_type, exhaustive_get ));
		}
		
		public int
		getNextValueVersions(
			int		num )
		{
			return( delegate.getNextValueVersions(num));
		}
		
		public DHTStorageBlock
		keyBlockRequest(
			DHTTransportContact		direct_sender,
			byte[]					request,
			byte[]					signature )
		{
			return( delegate.keyBlockRequest( direct_sender, request, signature ));
		}
		
		public DHTStorageBlock
		getKeyBlockDetails(
			byte[]		key )
		{
			return( delegate.getKeyBlockDetails(key));
		}
		
		public DHTStorageBlock[]
		getDirectKeyBlocks()
		{
			return( delegate.getDirectKeyBlocks());
		}
		
		public byte[]
    	getKeyForKeyBlock(
    		byte[]	request )
		{
			return( delegate.getKeyForKeyBlock( request ));
		}
		
		public void
		setStorageForKey(
			String	key,
			byte[]	data )
		{
			delegate.setStorageForKey( key, data );
		}
		
		public byte[]
		getStorageForKey(
			String	key )
		{
			return( delegate.getStorageForKey(key));
		}
	}
}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?