⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhtdbimpl.java

📁 一个基于JAVA的多torrent下载程序
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
					
					Object[]	data = (Object[])count.get( key );
									
					if ( data == null ){
						
						data = new Object[2];
						
						data[0] = new Integer(1);
						
						data[1] = "";
									
						count.put( key, data );
	
					}else{
						
						data[0] = new Integer(((Integer)data[0]).intValue() + 1 );
					}
				
					String	s = (String)data[1];
					
					s += (s.length()==0?"":", ") + "key=" + DHTLog.getString2(value_key.getHash()) + ",val=" + value.getString();
					
					data[1]	= s;
				}
			}
			
			it = count.keySet().iterator();
			
			while( it.hasNext()){
				
				Integer	k = (Integer)it.next();
				
				Object[]	data = (Object[])count.get(k);
				
				logger.log( "    " + k + " -> " + data[0] + " entries" ); // ": " + data[1]);
			}
			
			it = stored_values.entrySet().iterator();
			
			String	str 		= "    ";
			int		str_entries	= 0;
			
			while( it.hasNext()){
						
				Map.Entry		entry = (Map.Entry)it.next();
				
				HashWrapper		value_key	= (HashWrapper)entry.getKey();
				
				DHTDBMapping	mapping = (DHTDBMapping)entry.getValue();
				
				if ( str_entries == 16 ){
					
					logger.log( str );
					
					str = "    ";
					
					str_entries	= 0;
				}
				
				str_entries++;
				
				str += (str_entries==1?"":", ") + DHTLog.getString2(value_key.getHash()) + " -> " + mapping.getValueCount() + "/" + mapping.getHits()+"["+mapping.getLocalSize()+","+mapping.getDirectSize()+","+mapping.getIndirectSize() + "]";
			}
			
			if ( str_entries > 0 ){
				
				logger.log( str );
			}
		}finally{
			
			this_mon.exit();
		}
	}
	
	protected void
	banContact(
		final DHTTransportContact	contact,
		final String				reason )
	{
		new AEThread( "DHTDBImpl:delayed flood delete", true )
		{
			public void
			runSupport()
			{
					// delete their data on a separate thread so as not to 
					// interfere with the current action
				
				try{
					this_mon.enter();
					
					Iterator	it = stored_values.values().iterator();
												
					while( it.hasNext()){
						
						DHTDBMapping	mapping = (DHTDBMapping)it.next();

						Iterator	it2 = mapping.getDirectValues();
						
						while( it2.hasNext()){
							
							DHTDBValueImpl	val = (DHTDBValueImpl)it2.next();
							
							if ( !val.isLocal()){
								
								if ( Arrays.equals( val.getOriginator().getID(), contact.getID())){
									
									it.remove();
								}
							}
						}
					}

				}finally{
					
					this_mon.exit();
					
				}
			}
		}.start();
	
		logger.log( "Banning " + contact.getString() + " due to store flooding (" + reason + ")" );
		
		ip_filter.ban( 
				contact.getAddress().getAddress().getHostAddress(),
				"DHT: Sender stored excessive entries at this node (" + reason + ")" );		
	}
	
	protected void
	incrementValueAdds(
		DHTTransportContact	contact )
	{
			// assume a node stores 1000 values at 20 (K) locations -> 20,000 values
			// assume a DHT size of 100,000 nodes
			// that is, on average, 1 value per 5 nodes
			// assume NAT of up to 30 ports per address
			// this gives 6 values per address
			// with a factor of 10 error this is still only 60 per address
		
		int	hit_count = ip_count_bloom_filter.add( contact.getAddress().getAddress().getAddress());
		
		if ( DHTLog.GLOBAL_BLOOM_TRACE ){
		
			System.out.println( "direct add from " + contact.getAddress() + ", hit count = " + hit_count );
		}

			// allow up to 10% bloom filter utilisation
		
		if ( ip_count_bloom_filter.getSize() / ip_count_bloom_filter.getEntryCount() < 10 ){
			
			rebuildIPBloomFilter( true );
		}
		
		if ( hit_count > 64 ){
			
			// obviously being spammed, drop all data originated by this IP and ban it
			
			banContact( contact, "global flood" );
		}
	}
	
	protected void
	decrementValueAdds(
		DHTTransportContact	contact )
	{
		int	hit_count = ip_count_bloom_filter.remove( contact.getAddress().getAddress().getAddress());

		if ( DHTLog.GLOBAL_BLOOM_TRACE ){
			
			System.out.println( "direct remove from " + contact.getAddress() + ", hit count = " + hit_count );
		}
	}

	protected void
	rebuildIPBloomFilter(
		boolean	increase_size )
	{
		BloomFilter	new_filter;
		
		if ( increase_size ){
			
			new_filter = BloomFilterFactory.createAddRemove8Bit( ip_count_bloom_filter.getSize() + IP_COUNT_BLOOM_SIZE_INCREASE_CHUNK );
			
		}else{
			
			new_filter = BloomFilterFactory.createAddRemove8Bit( ip_count_bloom_filter.getSize());
			
		}
		
		try{
			
			//Map		sender_map	= new HashMap();
			//List	senders		= new ArrayList();
			
			Iterator	it = stored_values.values().iterator();
			
			int	max_hits = 0;
			
			while( it.hasNext()){
				
				DHTDBMapping	mapping = (DHTDBMapping)it.next();

				mapping.rebuildIPBloomFilter( false );
				
				Iterator	it2 = mapping.getDirectValues();
				
				while( it2.hasNext()){
					
					DHTDBValueImpl	val = (DHTDBValueImpl)it2.next();
					
					if ( !val.isLocal()){
						
						// logger.log( "    adding " + val.getOriginator().getAddress());
						
						int	hits = new_filter.add( val.getOriginator().getAddress().getAddress().getAddress());
						
						if ( hits > max_hits ){
							
							max_hits = hits;
						}
					}
				}
				
					// survey our neighbourhood
				
				/*
				 * its is non-trivial to do anything about nodes that get "close" to us and then
				 * spam us with crap. Ultimately, of course, to take a key out you "just" create
				 * the 20 closest nodes to the key and then run nodes that swallow all registrations
				 * and return nothing.  
				 * Protecting against one or two such nodes that flood crap requires crap to be
				 * identified. Tracing shows a large disparity between number of values registered
				 * per neighbour (factors of 100), so an approach based on number of registrations
				 * is non-trivial (assuming future scaling of the DHT, what do we consider crap?)
				 * A further approach would be to query the claimed originators of values (obviously
				 * a low bandwith approach, e.g. query 3 values from the contact with highest number
				 * of forwarded values). This requires originators to support long term knowledge of
				 * what they've published (we don't want to blacklist a neighbour because an originator
				 * has deleted a value/been restarted). We also then have to consider how to deal with
				 * non-responses to queries (assuming an affirmative Yes -> value has been forwarded
				 * correnctly, No -> probably crap). We can't treat non-replies as No. Thus a bad
				 * neighbour only has to forward crap with originators that aren't AZ nodes (very
				 * easy to do!) to break this aproach. 
				 * 
				 * 
				it2 = mapping.getIndirectValues();
				
				while( it2.hasNext()){
					
					DHTDBValueImpl	val = (DHTDBValueImpl)it2.next();
					
					DHTTransportContact sender = val.getSender();
					
					HashWrapper	hw = new HashWrapper( sender.getID());
					
					Integer	sender_count = (Integer)sender_map.get( hw );
					
					if ( sender_count == null ){
						
						sender_count = new Integer(1);
						
						senders.add( sender );
						
					}else{
						
						sender_count = new Integer( sender_count.intValue() + 1 );						
					}
					
					sender_map.put( hw, sender_count );
				}	
				*/
			}
			
			logger.log( "Rebuilt global IP bloom filter, size = " + new_filter.getSize() + ", entries =" + new_filter.getEntryCount()+", max hits = " + max_hits );
				
			/*
			senders = control.sortContactsByDistance( senders );
			
			for (int i=0;i<senders.size();i++){
				
				DHTTransportContact	sender = (DHTTransportContact)senders.get(i);
				
				System.out.println( i + ":" + sender.getString() + " -> " + sender_map.get(new HashWrapper(sender.getID())));	
			}
			*/
			
		}finally{
			
			ip_count_bloom_filter	= new_filter;
		}
	}
	
	protected void
	reportSizes(
		String	op )
	{
		/*
		if ( !this_mon.isHeld()){
			
			Debug.out( "Monitor not held" );
		}
		
		int	actual_keys 	= stored_values.size();
		int	actual_values 	= 0;
		int actual_size		= 0;
		
		Iterator it = stored_values.values().iterator();
		
		while( it.hasNext()){
		
			DHTDBMapping	mapping = (DHTDBMapping)it.next();
			
			int	reported_size = mapping.getLocalSize() + mapping.getDirectSize() + mapping.getIndirectSize();
			
			actual_values += mapping.getValueCount();
			
			Iterator	it2 = mapping.getValues();
			
			int	sz = 0;
			
			while( it2.hasNext()){
				
				DHTDBValue	val = (DHTDBValue)it2.next();
				
				sz += val.getValue().length;
			}
			
			if ( sz != reported_size ){
				
				Debug.out( "Reported mapping size != actual: " + reported_size + "/" + sz );
			}
			
			actual_size += sz;
		}
		
		if ( actual_keys != total_keys ){
			
			Debug.out( "Actual keys != total: " + actual_keys + "/" + total_keys );
		}
		
		if ( actual_values != total_values ){
			
			Debug.out( "Actual values != total: " + actual_values + "/" + total_values );
		}
		
		if ( actual_size != total_size ){
			
			Debug.out( "Actual size != total: " + actual_size + "/" + total_size );
		}
		
		System.out.println( "DHTDB: " + op + " - keys=" + total_keys + ", values=" + total_values + ", size=" + total_size );
		*/
	}
	
	protected int
	getNextValueVersion()
	{
		try{
			this_mon.enter();
			
			if ( next_value_version_left == 0 ){
				
				next_value_version_left = VALUE_VERSION_CHUNK;
				
				if ( adapter == null ){
					
						// no persistent manager, just carry on incrementing
					
				}else{
					
					next_value_version = adapter.getNextValueVersions( VALUE_VERSION_CHUNK );
				}
				
				//System.out.println( "next chunk:" + next_value_version );
			}
			
			next_value_version_left--;
			
			int	res = next_value_version++;
			
			//System.out.println( "next value version = " + res );
			
			return( res  );
			
		}finally{
			
			this_mon.exit();
		}
	}
	
	protected class
	adapterFacade
		implements DHTStorageAdapter
	{
		private DHTStorageAdapter		delegate;
		
		protected
		adapterFacade(
			DHTStorageAdapter	_delegate )
		{
			delegate = _delegate;
		}
		
		public DHTStorageKey
		keyCreated(
			HashWrapper		key,
			boolean			local )
		{
				// report *before* incrementing as this occurs before the key is locally added
			
			reportSizes( "keyAdded" );
			
			total_keys++;
			
			return( delegate.keyCreated( key, local ));
		}
		
		public void
		keyDeleted(
			DHTStorageKey	adapter_key )
		{
			total_keys--;
			
			reportSizes( "keyDeleted" );
			
			delegate.keyDeleted( adapter_key );
		}
		
		public void
		keyRead(
			DHTStorageKey			adapter_key,
			DHTTransportContact		contact )
		{
			reportSizes( "keyRead" );
			
			delegate.keyRead( adapter_key, contact );
		}
		
		public void
		valueAdded(
			DHTStorageKey		key,
			DHTTransportValue	value )
		{
			total_values++;
			total_size += value.getValue().length;
			
			reportSizes( "valueAdded");
			
			if ( !value.isLocal() ){
				
				DHTDBValueImpl	val = (DHTDBValueImpl)value;
				
				boolean	direct = Arrays.equals( value.getOriginator().getID(), val.getSender().getID());
				
				if ( direct ){
					
					incrementValueAdds( value.getOriginator());
				}
			}
				
			delegate.valueAdded( key, value );
		}
		
		public void
		valueUpdated(
			DHTStorageKey		key,
			DHTTransportValue	old_value,
			DHTTransportValue	new_value )
		{
			total_size += (new_value.getValue().length - old_value.getValue().length );
			
			reportSizes("valueUpdated");
			
			delegate.valueUpdated( key, old_value, new_value );
		}
		
		public void
		valueDeleted(
			DHTStorageKey		key,
			DHTTransportValue	value )
		{
			total_values--;
			total_size -= value.getValue().length;
		
			reportSizes("valueDeleted");
			
			if ( !value.isLocal() ){
				
				DHTDBValueImpl	val = (DHTDBValueImpl)value;
				
				boolean	direct = Arrays.equals( value.getOriginator().getID(), val.getSender().getID());
				
				if ( direct ){
					
					decrementValueAdds( value.getOriginator());
				}
			}

			delegate.valueDeleted( key, value );
		}
		
			// local lookup/put operations
		
		public byte[][]
		getExistingDiversification(
			byte[]			key,
			boolean			put_operation,
			boolean			exhaustive_get )
		{
			return( delegate.getExistingDiversification( key, put_operation, exhaustive_get ));
		}
		
		public byte[][]
		createNewDiversification(
			DHTTransportContact	cause,
			byte[]				key,
			boolean				put_operation,
			byte				diversification_type,
			boolean				exhaustive_get )
		{
			return( delegate.createNewDiversification( cause, key, put_operation, diversification_type, exhaustive_get ));
		}
		
		public int
		getNextValueVersions(
			int		num )
		{
			return( delegate.getNextValueVersions(num));
		}
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -