⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhtcontrolimpl.java

📁 一个基于JAVA的多torrent下载程序
💻 JAVA
📖 第 1 页 / 共 5 页
字号:

		DHTLog.log( "get for " + DHTLog.getString( encoded_key ));
		
		getSupport( encoded_key, description, flags, max_values, timeout, exhaustive, new DHTOperationListenerDemuxer( get_listener ));
	}
	
	public void
	getSupport(
		final byte[]						initial_encoded_key,
		final String						description,
		final byte							flags,
		final int							max_values,
		final long							timeout,
		final boolean						exhaustive,
		final DHTOperationListenerDemuxer	get_listener )
	{
			// get the initial starting point for the get - may have previously been diversified
		
		byte[][]	encoded_keys	= adapter.diversify( null, false, true, initial_encoded_key, DHT.DT_NONE, exhaustive );

		for (int i=0;i<encoded_keys.length;i++){
			
			final boolean[]	diversified = { false };

			final byte[]	encoded_key	= encoded_keys[i];
						
			final String	this_description = 
				Arrays.equals( encoded_key, initial_encoded_key )?
						description:
						("Diversification of [" + description + "]" );

			lookup( external_lookup_pool,
					encoded_key, 
					this_description,
					flags,
					true, 
					timeout,
					search_concurrency,
					max_values,
					router.getK(),
					new lookupResultHandler( get_listener )
					{
						private List	found_values	= new ArrayList();
							
						public void
						diversify(
							DHTTransportContact	cause,
							byte				diversification_type )
						{
								// we only want to follow one diversification
							
							if ( !diversified[0]){
								
								diversified[0] = true;

								int	rem = max_values==0?0:( max_values - found_values.size());
								
								if ( max_values == 0 || rem > 0 ){
									
									byte[][]	diversified_keys = adapter.diversify( cause, false, false, encoded_key, diversification_type, exhaustive );
									
										// should return a max of 1 (0 if diversification refused)
										// however, could change one day to search > 1 
									
									for (int j=0;j<diversified_keys.length;j++){
										
										getSupport( diversified_keys[j], "Diversification of [" + this_description + "]", flags, rem,  timeout, exhaustive, get_listener );
									}
								}								
							}
						}
						
						public void
						read(
							DHTTransportContact	contact,
							DHTTransportValue	value )
						{	
							found_values.add( value );
							
							super.read( contact, value );
						}
														
						public void
						closest(
							List	closest )
						{
							/* we don't use teh cache-at-closest kad feature
							if ( found_values.size() > 0 ){
									
								DHTTransportValue[]	values = new DHTTransportValue[found_values.size()];
								
								found_values.toArray( values );
								
									// cache the values at the 'n' closest seen locations
								
								for (int k=0;k<Math.min(cache_at_closest_n,closest.size());k++){
									
									DHTTransportContact	contact = (DHTTransportContact)(DHTTransportContact)closest.get(k);
									
									for (int j=0;j<values.length;j++){
										
										wrote( contact, values[j] );
									}
									
									contact.sendStore( 
											new DHTTransportReplyHandlerAdapter()
											{
												public void
												storeReply(
													DHTTransportContact _contact,
													byte[]				_diversifications )
												{
														// don't consider diversification for cache stores as we're not that
														// bothered
													
													DHTLog.log( "Cache store OK " + DHTLog.getString( _contact ));
													
													router.contactAlive( _contact.getID(), new DHTControlContactImpl(_contact));
												}	
												
												public void
												failed(
													DHTTransportContact 	_contact,
													Throwable 				_error )
												{
													DHTLog.log( "Cache store failed " + DHTLog.getString( _contact ) + " -> failed: " + _error.getMessage());
													
													router.contactDead( _contact.getID(), false );
												}
											},
											new byte[][]{ encoded_key }, 
											new DHTTransportValue[][]{ values });
								}
							}
							*/
						}
					});
		}
	}
		
	public byte[]
	remove(
		byte[]					unencoded_key,
		String					description,
		DHTOperationListener	listener )
	{		
		final byte[]	encoded_key = encodeKey( unencoded_key );

		DHTLog.log( "remove for " + DHTLog.getString( encoded_key ));

		DHTDBValue	res = database.remove( local_contact, new HashWrapper( encoded_key ));
		
		if ( res == null ){
			
				// not found locally, nothing to do
			
			return( null );
			
		}else{
			
				// we remove a key by pushing it back out again with zero length value 
						
			put( 	external_put_pool, 
					encoded_key, 
					description, 
					res,
					0, 
					true, 
					new HashSet(),
					new DHTOperationListenerDemuxer( listener ));
			
			return( res.getValue());
		}
	}
	
		/**
		 * The lookup method returns up to K closest nodes to the target
		 * @param lookup_id
		 * @return
		 */
	
	protected void
	lookup(
		ThreadPool					thread_pool,
		final byte[]				lookup_id,
		final String				description,
		final byte					flags,
		final boolean				value_search,
		final long					timeout,
		final int					concurrency,
		final int					max_values,
		final int					search_accuracy,
		final lookupResultHandler	handler )
	{
		thread_pool.run(
			new task(thread_pool)
			{
				public void
				runSupport()
				{
					try{
						lookupSupportSync( lookup_id, flags, value_search, timeout, concurrency, max_values, search_accuracy, handler );
						
					}catch( Throwable e ){
						
						Debug.printStackTrace(e);
					}
				}
				
				public byte[]
				getTarget()
				{
					return( lookup_id ); 
				}
				
				public String
				getDescription()
				{
					return( description );
				}
			});
	}
	
	protected void
	lookupSupportSync(
		final byte[]				lookup_id,
		byte						flags,
		boolean						value_search,
		long						timeout,
		int							concurrency,
		int							max_values,
		final int					search_accuracy,
		final lookupResultHandler	result_handler )
	{
		boolean		timeout_occurred	= false;
	
		last_lookup	= SystemTime.getCurrentTime();
	
		result_handler.incrementCompletes();
		
		try{
			DHTLog.log( "lookup for " + DHTLog.getString( lookup_id ));
			
				// keep querying successively closer nodes until we have got responses from the K
				// closest nodes that we've seen. We might get a bunch of closer nodes that then
				// fail to respond, which means we have reconsider further away nodes
			
				// we keep a list of nodes that we have queried to avoid re-querying them
			
				// we keep a list of nodes discovered that we have yet to query
			
				// we have a parallel search limit of A. For each A we effectively loop grabbing
				// the currently closest unqueried node, querying it and adding the results to the
				// yet-to-query-set (unless already queried)
			
				// we terminate when we have received responses from the K closest nodes we know
				// about (excluding failed ones)
			
				// Note that we never widen the root of our search beyond the initial K closest
				// that we know about - this could be relaxed
			
						
				// contacts remaining to query
				// closest at front
	
			final Set		contacts_to_query	= getClosestContactsSet( lookup_id, false );
			
			final AEMonitor	contacts_to_query_mon	= new AEMonitor( "DHTControl:ctq" );

			final Map	level_map			= new HashMap();
			
			Iterator	it = contacts_to_query.iterator();
			
			while( it.hasNext()){
				
				DHTTransportContact	contact	= (DHTTransportContact)it.next();
				
				result_handler.found( contact );
				
				level_map.put( contact , new Integer(0));
			}
			
				// record the set of contacts we've queried to avoid re-queries
			
			final Map			contacts_queried = new HashMap();
			
				// record the set of contacts that we've had a reply from
				// furthest away at front
			
			final Set			ok_contacts = new sortedTransportContactSet( lookup_id, false ).getSet(); 
			
	
				// this handles the search concurrency
			
			final AESemaphore	search_sem = new AESemaphore( "DHTControl:search", concurrency );
				
			final int[]	idle_searches	= { 0 };
			final int[]	active_searches	= { 0 };
				
			final int[]	values_found	= { 0 };
			final int[]	value_replies	= { 0 };
			final Set	values_found_set	= new HashSet();
			
			long	start = SystemTime.getCurrentTime();
	
			while( true ){
				
				if ( timeout > 0 ){
					
					long	now = SystemTime.getCurrentTime();
					
						// check for clock being set back
					
					if ( now < start ){
						
						start	= now;
					}
					
					long remaining = timeout - ( now - start );
						
					if ( remaining <= 0 ){
						
						DHTLog.log( "lookup: terminates - timeout" );
	
						timeout_occurred	= true;
						
						break;
						
					}
						// get permission to kick off another search
					
					if ( !search_sem.reserve( remaining )){
						
						DHTLog.log( "lookup: terminates - timeout" );
	
						timeout_occurred	= true;
						
						break;
					}
				}else{
					
					search_sem.reserve();
				}
					
				try{
					contacts_to_query_mon.enter();
			
					if ( 	values_found[0] >= max_values ||
							value_replies[0]>= 2 ){	// all hits should have the same values anyway...	
							
						break;
					}						

						// if nothing pending then we need to wait for the results of a previous
						// search to arrive. Of course, if there are no searches active then
						// we've run out of things to do
					
					if ( contacts_to_query.size() == 0 ){
						
						if ( active_searches[0] == 0 ){
							
							DHTLog.log( "lookup: terminates - no contacts left to query" );
							
							break;
						}
						
						idle_searches[0]++;
						
						continue;
					}
				
						// select the next contact to search
					
					DHTTransportContact	closest	= (DHTTransportContact)contacts_to_query.iterator().next();			
				
						// if the next closest is further away than the furthest successful hit so 
						// far and we have K hits, we're done
					
					if ( ok_contacts.size() == search_accuracy ){
						
						DHTTransportContact	furthest_ok = (DHTTransportContact)ok_contacts.iterator().next();
						
						int	distance = computeAndCompareDistances( furthest_ok.getID(), closest.getID(), lookup_id );
						
						if ( distance <= 0 ){
							
							DHTLog.log( "lookup: terminates - we've searched the closest " + search_accuracy + " contacts" );
	
							break;
						}
					}
					
					// we optimise the first few entries based on their Vivaldi distance. Only a few
					// however as we don't want to start too far away from the target.
						
					if ( contacts_queried.size() < concurrency ){
						
						VivaldiPosition	loc_vp = local_contact.getVivaldiPosition();
						
						if ( !loc_vp.getCoordinates().atOrigin()){
							
							DHTTransportContact	vp_closest = null;
							
							Iterator vp_it = contacts_to_query.iterator();
							
							int	vp_count_limit = (concurrency*2) - contacts_queried.size();
							
							int	vp_count = 0;
							
							float	best_dist = Float.MAX_VALUE;
							
							while( vp_it.hasNext() && vp_count < vp_count_limit ){
								
								vp_count++;
								
								DHTTransportContact	entry	= (DHTTransportContact)vp_it.next();
								
								VivaldiPosition	vp = entry.getVivaldiPosition();
								
								Coordinates	coords = vp.getCoordinates();
								
								if ( !coords.atOrigin()){
									
									float	dist = loc_vp.estimateRTT( coords );
									
									if ( dist < best_dist ){
										
										best_dist	= dist;
										
										vp_closest	= entry;
										
										// System.out.println( start + ": lookup for " + DHTLog.getString2( lookup_id ) + ": vp override (dist = " + dist + ")");
									}
								}
							}
						
							if ( vp_closest != null ){
								
									// override ID closest with VP closes
								
								closest = vp_closest;
							}
						}
					}
					
					contacts_to_query.remove( closest );
	
					contacts_queried.put( new HashWrapper( closest.getID()), closest );
								
						// never search ourselves!
					
					if ( router.isID( closest.getID())){
						
						search_sem.release();
						
						continue;
					}
	
					final int	search_level = ((Integer)level_map.get(closest)).intValue();

					active_searches[0]++;				
					
					result_handler.searching( closest, search_level, active_searches[0] );
					
					DHTTransportReplyHandlerAdapter	handler = 
						new DHTTransportReplyHandlerAdapter()
						{
							private boolean	value_reply_received	= false;
							
							public void
							findNodeReply(
								DHTTransportContact 	target_contact,
								DHTTransportContact[]	reply_contacts )
							{
								try{
									DHTLog.log( "findNodeReply: " + DHTLog.getString( reply_contacts ));
							
									router.contactAlive( target_contact.getID(), new DHTControlContactImpl(target_contact));
									
									for (int i=0;i<reply_contacts.length;i++){
										
										DHTTransportContact	contact = reply_contacts[i];
										
											// ignore responses that are ourselves
										
										if ( compareDistances( router.getID(), contact.getID()) == 0 ){
											
											continue;
										}
										
											// dunno if its alive or not, however record its existance
										
										router.contactKnown( contact.getID(), new DHTControlContactImpl(contact));
									}
									
									try{
										contacts_to_query_mon.enter();
												
										ok_contacts.add( target_contact );
										
										if ( ok_contacts.size() > search_accuracy ){
											
												// delete the furthest away
											
											Iterator ok_it = ok_contacts.iterator();
											
											ok_it.next();
											
											ok_it.remove();
										}
										
										for (int i=0;i<reply_contacts.length;i++){
											
											DHTTransportContact	contact = reply_contacts[i];
											
												// ignore responses that are ourselves
											
											if ( compareDistances( router.getID(), contact.getID()) == 0 ){

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -