⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhtdbimpl.java

📁 一个基于JAVA的多torrent下载程序
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
			
			this_mon.exit();
		}
	}
	
	public boolean
	isEmpty()
	{
		return( total_keys == 0 );
	}
	
	public int
	getKeyCount()
	{
		return( (int)total_keys );
	}
	
	public int[]
	getValueDetails()
	{
		try{
			this_mon.enter();
			
			int[]	res = new int[6];
			
			Iterator	it = stored_values.values().iterator();
			
			while( it.hasNext()){
				
				DHTDBMapping	mapping = (DHTDBMapping)it.next();
				
				res[DHTDBStats.VD_VALUE_COUNT] += mapping.getValueCount();
				res[DHTDBStats.VD_LOCAL_SIZE] += mapping.getLocalSize();
				res[DHTDBStats.VD_DIRECT_SIZE] += mapping.getDirectSize();
				res[DHTDBStats.VD_INDIRECT_SIZE] += mapping.getIndirectSize();
				
				int	dt = mapping.getDiversificationType();
				
				if ( dt == DHT.DT_FREQUENCY ){
					
					res[DHTDBStats.VD_DIV_FREQ]++;
					
				}else if ( dt == DHT.DT_SIZE ){
					
					res[DHTDBStats.VD_DIV_SIZE]++;
				}
			}
			
			return( res );
			
		}finally{
			
			this_mon.exit();
		}
	}
	
	public Iterator
	getKeys()
	{
		try{
			this_mon.enter();
			
			return( new ArrayList( stored_values.keySet()).iterator());
			
		}finally{
			
			this_mon.exit();
		}
	}
	
	protected int
	republishOriginalMappings()
	{
		int	values_published	= 0;

		Map	republish = new HashMap();
		
		try{
			this_mon.enter();
			
			Iterator	it = stored_values.entrySet().iterator();
			
			while( it.hasNext()){
				
				Map.Entry	entry = (Map.Entry)it.next();
				
				HashWrapper		key		= (HashWrapper)entry.getKey();
				
				DHTDBMapping	mapping	= (DHTDBMapping)entry.getValue();
				
				Iterator	it2 = mapping.getValues();
				
				List	values = new ArrayList();
				
				while( it2.hasNext()){
					
					DHTDBValueImpl	value = (DHTDBValueImpl)it2.next();
				
					if ( value != null && value.isLocal()){
						
						// we're republising the data, reset the creation time
						
						value.setCreationTime();

						values.add( value );
					}
				}
				
				if ( values.size() > 0 ){
					
					republish.put( key, values );
					
				}
			}
		}finally{
			
			this_mon.exit();
		}
		
		Iterator	it = republish.entrySet().iterator();
		
		while( it.hasNext()){
			
			Map.Entry	entry = (Map.Entry)it.next();
			
			HashWrapper			key		= (HashWrapper)entry.getKey();
			
			List		values	= (List)entry.getValue();
			
				// no point in worry about multi-value puts here as it is extremely unlikely that
				// > 1 value will locally stored, or > 1 value will go to the same contact
			
			for (int i=0;i<values.size();i++){
				
				values_published++;
				
				control.putEncodedKey( key.getHash(), "Republish", (DHTDBValueImpl)values.get(i), 0, true );
			}
		}
		
		return( values_published );
	}
	
	protected int[]
	republishCachedMappings()
	{		
			// first refresh any leaves that have not performed at least one lookup in the
			// last period
		
		router.refreshIdleLeaves( cache_republish_interval );
		
		final Map	republish = new HashMap();
		
		long	now = System.currentTimeMillis();
		
		try{
			this_mon.enter();
			
			checkCacheExpiration( true );

			Iterator	it = stored_values.entrySet().iterator();
			
			while( it.hasNext()){
				
				Map.Entry	entry = (Map.Entry)it.next();
				
				HashWrapper			key		= (HashWrapper)entry.getKey();
				
				DHTDBMapping		mapping	= (DHTDBMapping)entry.getValue();
				
					// assume that if we've diversified then the other k-1 locations are under similar
					// stress and will have done likewise - no point in republishing cache values to them
					// New nodes joining will have had stuff forwarded to them regardless of diversification
					// status
				
				if ( mapping.getDiversificationType() != DHT.DT_NONE ){
					
					continue;
				}
				
				Iterator	it2 = mapping.getValues();
				
				List	values = new ArrayList();
				
				while( it2.hasNext()){
					
					DHTDBValueImpl	value = (DHTDBValueImpl)it2.next();
				
					if ( !value.isLocal()){
						
							// if this value was stored < period ago then we assume that it was
							// also stored to the other k-1 locations at the same time and therefore
							// we don't need to re-store it
						
						if ( now < value.getStoreTime()){
							
								// deal with clock changes
							
							value.setStoreTime( now );
							
						}else if ( now - value.getStoreTime() <= cache_republish_interval ){
							
							// System.out.println( "skipping store" );
							
						}else{
								
							values.add( value );
						}
					}
				}

				if ( values.size() > 0 ){
					
					republish.put( key, values );
					
				}
			}
		}finally{
			
			this_mon.exit();
		}
		
		final int[]	values_published	= {0};
		final int[]	keys_published		= {0};
		final int[]	republish_ops		= {0};
		
		if ( republish.size() > 0 ){
			
			// System.out.println( "cache replublish" );
			
				// The approach is to refresh all leaves in the smallest subtree, thus populating the tree with
				// sufficient information to directly know which nodes to republish the values
				// to.
			
				// However, I'm going to rely on the "refresh idle leaves" logic above
				// (that's required to keep the DHT alive in general) to ensure that all
				// k-buckets are reasonably up-to-date
					
			Iterator	it = republish.entrySet().iterator();
			
			List	stop_caching = new ArrayList();
			
				// build a map of contact -> list of keys to republish
			
			Map	contact_map	= new HashMap();
			
			while( it.hasNext()){
				
				Map.Entry	entry = (Map.Entry)it.next();
				
				HashWrapper			key		= (HashWrapper)entry.getKey();
				
				byte[]	lookup_id	= key.getHash();
				
					// just use the closest contacts - if some have failed then they'll
					// get flushed out by this operation. Grabbing just the live ones
					// is a bad idea as failures may rack up against the live ones due
					// to network problems and kill them, leaving the dead ones!
				
				List	contacts = control.getClosestKContactsList( lookup_id, false );
							
					// if we are no longer one of the K closest contacts then we shouldn't
					// cache the value
				
				boolean	keep_caching	= false;
				
				for (int j=0;j<contacts.size();j++){
				
					if ( router.isID(((DHTTransportContact)contacts.get(j)).getID())){
						
						keep_caching	= true;
						
						break;
					}
				}
				
				if ( !keep_caching ){
					
					DHTLog.log( "Dropping cache entry for " + DHTLog.getString( lookup_id ) + " as now too far away" );
					
					stop_caching.add( key );
					
						// we carry on and do one last publish
					
				}
				
				for (int j=0;j<contacts.size();j++){
					
					DHTTransportContact	contact = (DHTTransportContact)contacts.get(j);
					
					if ( router.isID( contact.getID())){
						
						continue;	// ignore ourselves
					}
					
					Object[]	data = (Object[])contact_map.get( new HashWrapper(contact.getID()));
					
					if ( data == null ){
						
						data	= new Object[]{ contact, new ArrayList()};
						
						contact_map.put( new HashWrapper(contact.getID()), data );
					}
					
					((List)data[1]).add( key );
				}
			}
		
			it = contact_map.values().iterator();
			
			while( it.hasNext()){
				
				final Object[]	data = (Object[])it.next();
				
				final DHTTransportContact	contact = (DHTTransportContact)data[0];
				
					// move to anti-spoof on cache forwards - gotta do a find-node first
					// to get the random id
				
				final AESemaphore	sem = new AESemaphore( "DHTDB:cacheForward" );
				
				contact.sendFindNode(
						new DHTTransportReplyHandlerAdapter()
						{
							public void
							findNodeReply(
								DHTTransportContact 	_contact,
								DHTTransportContact[]	_contacts )
							{	
								try{
									// System.out.println( "cacheForward: pre-store findNode OK" );
								
									List				keys	= (List)data[1];
										
									byte[][]				store_keys 		= new byte[keys.size()][];
									DHTTransportValue[][]	store_values 	= new DHTTransportValue[store_keys.length][];
									
									keys_published[0] += store_keys.length;
									
									for (int i=0;i<store_keys.length;i++){
										
										HashWrapper	wrapper = (HashWrapper)keys.get(i);
										
										store_keys[i] = wrapper.getHash();
										
										List		values	= (List)republish.get( wrapper );
										
										store_values[i] = new DHTTransportValue[values.size()];
							
										values_published[0] += store_values[i].length;
										
										for (int j=0;j<values.size();j++){
										
											DHTDBValueImpl	value	= (DHTDBValueImpl)values.get(j);
												
												// we reduce the cache distance by 1 here as it is incremented by the
												// recipients
											
											store_values[i][j] = value.getValueForRelay(local_contact);
										}
									}
										
									List	contacts = new ArrayList();
									
									contacts.add( contact );
									
									republish_ops[0]++;
									
									control.putDirectEncodedKeys( 
											store_keys, 
											"Republish cache",
											store_values,
											contacts );
								}finally{
									
									sem.release();
								}
							} 
							
							public void
							failed(
								DHTTransportContact 	_contact,
								Throwable				_error )
							{
								try{
									// System.out.println( "cacheForward: pre-store findNode Failed" );
	
									DHTLog.log( "cacheForward: pre-store findNode failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
																			
									router.contactDead( _contact.getID(), false);
									
								}finally{
									
									sem.release();
								}
							}
						},
						contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_ANTI_SPOOF2?new byte[0]:new byte[20] );
				
				sem.reserve();
			}
			
			try{
				this_mon.enter();
				
				for (int i=0;i<stop_caching.size();i++){
					
					DHTDBMapping	mapping = (DHTDBMapping)stored_values.remove( stop_caching.get(i));
					
					if ( mapping != null ){
						
						mapping.destroy();
					}
				}
			}finally{
				
				this_mon.exit();
			}
		}
		
		return( new int[]{ values_published[0], keys_published[0], republish_ops[0] });
	}
		
	protected void
	checkCacheExpiration(
		boolean		force )
	{
		long	 now = SystemTime.getCurrentTime();
		
		if ( !force ){
			
			long elapsed = now - last_cache_expiry_check;
			
			if ( elapsed > 0 && elapsed < MIN_CACHE_EXPIRY_CHECK_INTERVAL ){
				
				return;
			}
		}
			
		try{
			this_mon.enter();
			
			last_cache_expiry_check	= now;
			
			Iterator	it = stored_values.values().iterator();
			
			while( it.hasNext()){
				
				DHTDBMapping	mapping = (DHTDBMapping)it.next();
	
				if ( mapping.getValueCount() == 0 ){
					
					mapping.destroy();
					
					it.remove();
										
				}else{
					
					Iterator	it2 = mapping.getValues();
					
					while( it2.hasNext()){
						
						DHTDBValueImpl	value = (DHTDBValueImpl)it2.next();				
						
						if ( !value.isLocal()){
							
								// distance 1 = initial store location. We use the initial creation date
								// when deciding whether or not to remove this, plus a bit, as the 
								// original publisher is supposed to republish these
							
							if ( now - value.getCreationTime() > original_republish_interval + ORIGINAL_REPUBLISH_INTERVAL_GRACE ){
								
								DHTLog.log( "removing cache entry (" + value.getString() + ")" );
								
								it2.remove();
							}	
						}
					}
				}
			}
		}finally{
			
			this_mon.exit();
		}
	}
	
	protected DHTStorageAdapter
	getAdapter()
	{
		return( adapter );
	}
	
	protected void
	log(
		String	str )
	{
		logger.log( str );
	}
	
	public DHTDBStats
	getStats()
	{
		return( this );
	}
	
	public void
	print()
	{
		Map	count = new TreeMap();
		
		try{
			this_mon.enter();
			
			logger.log( "Stored keys = " + stored_values.size() + ", values = " + getValueDetails()[DHTDBStats.VD_VALUE_COUNT]); 

			Iterator	it = stored_values.entrySet().iterator();
			
			while( it.hasNext()){
						
				Map.Entry		entry = (Map.Entry)it.next();
				
				HashWrapper		value_key	= (HashWrapper)entry.getKey();
				
				DHTDBMapping	mapping = (DHTDBMapping)entry.getValue();
				
				DHTDBValue[]	values = mapping.get(null,0);
					
				for (int i=0;i<values.length;i++){
					
					DHTDBValue	value = values[i];
					
					Integer key = new Integer( value.isLocal()?0:1);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -