⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhtdbimpl.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
					}
					
					((List)data[1]).add( key );
				}
			}
		
			it = contact_map.values().iterator();
			
			while( it.hasNext()){
				
				final Object[]	data = (Object[])it.next();
				
				final DHTTransportContact	contact = (DHTTransportContact)data[0];
				
					// move to anti-spoof on cache forwards - gotta do a find-node first
					// to get the random id
				
				final AESemaphore	sem = new AESemaphore( "DHTDB:cacheForward" );
				
				contact.sendFindNode(
						new DHTTransportReplyHandlerAdapter()
						{
							public void
							findNodeReply(
								DHTTransportContact 	_contact,
								DHTTransportContact[]	_contacts )
							{	
								anti_spoof_done.add( _contact );
							
								try{
									// System.out.println( "cacheForward: pre-store findNode OK" );
								
									List				keys	= (List)data[1];
										
									byte[][]				store_keys 		= new byte[keys.size()][];
									DHTTransportValue[][]	store_values 	= new DHTTransportValue[store_keys.length][];
									
									keys_published[0] += store_keys.length;
									
									for (int i=0;i<store_keys.length;i++){
										
										HashWrapper	wrapper = (HashWrapper)keys.get(i);
										
										store_keys[i] = wrapper.getHash();
										
										List		values	= (List)republish.get( wrapper );
										
										store_values[i] = new DHTTransportValue[values.size()];
							
										values_published[0] += store_values[i].length;
										
										for (int j=0;j<values.size();j++){
										
											DHTDBValueImpl	value	= (DHTDBValueImpl)values.get(j);
												
												// we reduce the cache distance by 1 here as it is incremented by the
												// recipients
											
											store_values[i][j] = value.getValueForRelay(local_contact);
										}
									}
										
									List	contacts = new ArrayList();
									
									contacts.add( contact );
									
									republish_ops[0]++;
									
									control.putDirectEncodedKeys( 
											store_keys, 
											"Republish cache",
											store_values,
											contacts );
								}finally{
									
									sem.release();
								}
							} 
							
							public void
							failed(
								DHTTransportContact 	_contact,
								Throwable				_error )
							{
								try{
									// System.out.println( "cacheForward: pre-store findNode Failed" );
	
									DHTLog.log( "cacheForward: pre-store findNode failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
																			
									router.contactDead( _contact.getID(), false);
									
								}finally{
									
									sem.release();
								}
							}
						},
						contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_ANTI_SPOOF2?new byte[0]:new byte[20] );
				
				sem.reserve();
			}
			
			try{
				this_mon.enter();
				
				for (int i=0;i<stop_caching.size();i++){
					
					DHTDBMapping	mapping = (DHTDBMapping)stored_values.remove( stop_caching.get(i));
					
					if ( mapping != null ){
						
						mapping.destroy();
					}
				}
			}finally{
				
				this_mon.exit();
			}
		}
		
		DHTStorageBlock[]	direct_key_blocks = getDirectKeyBlocks();

		if ( direct_key_blocks.length > 0 ){
					
			for (int i=0;i<direct_key_blocks.length;i++){
			
				final DHTStorageBlock	key_block = direct_key_blocks[i];
				
				List	contacts = control.getClosestKContactsList( key_block.getKey(), false );

				boolean	forward_it = false;
				
					// ensure that the key is close enough to us 
				
				for (int j=0;j<contacts.size();j++){

					final DHTTransportContact	contact = (DHTTransportContact)contacts.get(j);

					if ( router.isID( contact.getID())){
						
						forward_it	= true;
						
						break;
					}
				}
					
				for (int j=0; forward_it && j<contacts.size();j++){
					
					final DHTTransportContact	contact = (DHTTransportContact)contacts.get(j);
					
					if ( key_block.hasBeenSentTo( contact )){
						
						continue;
					}
					
					if ( contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_BLOCK_KEYS ){
						
						final Runnable task = 
							new Runnable()
							{
								public void
								run()
								{
									contact.sendKeyBlock(
										new DHTTransportReplyHandlerAdapter()
										{
											public void
											keyBlockReply(
												DHTTransportContact 	_contact )
											{
												DHTLog.log( "key block forward ok " + DHTLog.getString( _contact ));
												
												key_block.sentTo( _contact );
											}
											
											public void
											failed(
												DHTTransportContact 	_contact,
												Throwable				_error )
											{
												DHTLog.log( "key block forward failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
											}
										},
										key_block.getRequest(),
										key_block.getCertificate());
								}
							};
						
							if ( anti_spoof_done.contains( contact )){
								
								task.run();
								
							}else{
								
								contact.sendFindNode(
										new DHTTransportReplyHandlerAdapter()
										{
											public void
											findNodeReply(
												DHTTransportContact 	contact,
												DHTTransportContact[]	contacts )
											{	
												task.run();
											}
											public void
											failed(
												DHTTransportContact 	_contact,
												Throwable				_error )
											{
												// System.out.println( "nodeAdded: pre-store findNode Failed" );

												DHTLog.log( "pre-kb findNode failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
																						
												router.contactDead( _contact.getID(), false);
											}
										},
										contact.getProtocolVersion() >= DHTTransportUDP.PROTOCOL_VERSION_ANTI_SPOOF2?new byte[0]:new byte[20] );
							}
					}
				}
			}
		}
		
		return( new int[]{ values_published[0], keys_published[0], republish_ops[0] });
	}
		
	protected void
	checkCacheExpiration(
		boolean		force )
	{
		long	 now = SystemTime.getCurrentTime();
		
		if ( !force ){
			
			long elapsed = now - last_cache_expiry_check;
			
			if ( elapsed > 0 && elapsed < MIN_CACHE_EXPIRY_CHECK_INTERVAL ){
				
				return;
			}
		}
			
		try{
			this_mon.enter();
			
			last_cache_expiry_check	= now;
			
			Iterator	it = stored_values.values().iterator();
			
			while( it.hasNext()){
				
				DHTDBMapping	mapping = (DHTDBMapping)it.next();
	
				if ( mapping.getValueCount() == 0 ){
					
					mapping.destroy();
					
					it.remove();
										
				}else{
					
					Iterator	it2 = mapping.getValues();
					
					while( it2.hasNext()){
						
						DHTDBValueImpl	value = (DHTDBValueImpl)it2.next();				
						
						if ( !value.isLocal()){
							
								// distance 1 = initial store location. We use the initial creation date
								// when deciding whether or not to remove this, plus a bit, as the 
								// original publisher is supposed to republish these
							
							if ( now - value.getCreationTime() > original_republish_interval + ORIGINAL_REPUBLISH_INTERVAL_GRACE ){
								
								DHTLog.log( "removing cache entry (" + value.getString() + ")" );
								
								it2.remove();
							}	
						}
					}
				}
			}
		}finally{
			
			this_mon.exit();
		}
	}
	
	protected DHTTransportContact
	getLocalContact()
	{
		return( local_contact );
	}
	
	protected DHTStorageAdapter
	getAdapter()
	{
		return( adapter );
	}
	
	protected void
	log(
		String	str )
	{
		logger.log( str );
	}
	
	public DHTDBStats
	getStats()
	{
		return( this );
	}
	
	public void
	print()
	{
		Map	count = new TreeMap();
		
		try{
			this_mon.enter();
			
			logger.log( "Stored keys = " + stored_values.size() + ", values = " + getValueDetails()[DHTDBStats.VD_VALUE_COUNT]); 

			Iterator	it = stored_values.entrySet().iterator();
			
			while( it.hasNext()){
						
				Map.Entry		entry = (Map.Entry)it.next();
				
				HashWrapper		value_key	= (HashWrapper)entry.getKey();
				
				DHTDBMapping	mapping = (DHTDBMapping)entry.getValue();
				
				DHTDBValue[]	values = mapping.get(null,0,(byte)0);
					
				for (int i=0;i<values.length;i++){
					
					DHTDBValue	value = values[i];
					
					Integer key = new Integer( value.isLocal()?0:1);
					
					Object[]	data = (Object[])count.get( key );
									
					if ( data == null ){
						
						data = new Object[2];
						
						data[0] = new Integer(1);
						
						data[1] = "";
									
						count.put( key, data );
	
					}else{
						
						data[0] = new Integer(((Integer)data[0]).intValue() + 1 );
					}
				
					String	s = (String)data[1];
					
					s += (s.length()==0?"":", ") + "key=" + DHTLog.getString2(value_key.getHash()) + ",val=" + value.getString();
					
					data[1]	= s;
				}
			}
			
			it = count.keySet().iterator();
			
			while( it.hasNext()){
				
				Integer	k = (Integer)it.next();
				
				Object[]	data = (Object[])count.get(k);
				
				logger.log( "    " + k + " -> " + data[0] + " entries" ); // ": " + data[1]);
			}
			
			it = stored_values.entrySet().iterator();
			
			String	str 		= "    ";
			int		str_entries	= 0;
			
			while( it.hasNext()){
						
				Map.Entry		entry = (Map.Entry)it.next();
				
				HashWrapper		value_key	= (HashWrapper)entry.getKey();
				
				DHTDBMapping	mapping = (DHTDBMapping)entry.getValue();
				
				if ( str_entries == 16 ){
					
					logger.log( str );
					
					str = "    ";
					
					str_entries	= 0;
				}
				
				str_entries++;
				
				str += (str_entries==1?"":", ") + DHTLog.getString2(value_key.getHash()) + " -> " + mapping.getValueCount() + "/" + mapping.getHits()+"["+mapping.getLocalSize()+","+mapping.getDirectSize()+","+mapping.getIndirectSize() + "]";
			}
			
			if ( str_entries > 0 ){
				
				logger.log( str );
			}
		}finally{
			
			this_mon.exit();
		}
	}
	
	protected void
	banContact(
		final DHTTransportContact	contact,
		final String				reason )
	{
		new AEThread( "DHTDBImpl:delayed flood delete", true )
		{
			public void
			runSupport()
			{
					// delete their data on a separate thread so as not to 
					// interfere with the current action
				
				try{
					this_mon.enter();
					
					Iterator	it = stored_values.values().iterator();
												
					while( it.hasNext()){
						
						DHTDBMapping	mapping = (DHTDBMapping)it.next();

						Iterator	it2 = mapping.getDirectValues();
						
						while( it2.hasNext()){
							
							DHTDBValueImpl	val = (DHTDBValueImpl)it2.next();
							
							if ( !val.isLocal()){
								
								if ( Arrays.equals( val.getOriginator().getID(), contact.getID())){
									
									it.remove();
								}
							}
						}
					}

				}finally{
					
					this_mon.exit();
					
				}
			}
		}.start();
	
		logger.log( "Banning " + contact.getString() + " due to store flooding (" + reason + ")" );
		
		ip_filter.ban( 
				contact.getAddress().getAddress().getHostAddress(),
				"DHT: Sender stored excessive entries at this node (" + reason + ")" );		
	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -