⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhtcontrolimpl.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
📖 第 1 页 / 共 5 页
字号:
			}
		}
		
		//System.out.println( "    after adding live ones = " + to_save.size());
		
			// now add any reserve ones
		
		for (int i=0;i<reserves.size();i++){
			
			DHTRouterContact	contact = (DHTRouterContact)reserves.get(i);
		
			if ( !to_save.contains( contact )){
				
				to_save.add( contact );
			}
		}
		
		//System.out.println( "    after adding reserves = " + to_save.size());

			// now add in the rest!
		
		for (int i=0;i<contacts.size();i++){
			
			DHTRouterContact	contact = (DHTRouterContact)contacts.get(i);
		
			if (!to_save.contains( contact )){
				
				to_save.add( contact );
			}
		}	
		
			// and finally remove the invalid ones
		
		Iterator	it = to_save.iterator();
		
		while( it.hasNext()){
			
			DHTRouterContact	contact	= (DHTRouterContact)it.next();
			
			DHTTransportContact	t_contact = ((DHTControlContactImpl)contact.getAttachment()).getTransportContact();
			
			if ( !t_contact.isValid()){
				
				it.remove();
			}
		}
	
		//System.out.println( "    finally = " + to_save.size());

		int	num_to_write = Math.min( max, to_save.size());
		
		daos.writeInt( num_to_write );
				
		for (int i=0;i<num_to_write;i++){
			
			DHTRouterContact	contact = (DHTRouterContact)to_save.get(i);
			
			//System.out.println( "export:" + contact.getString());
			
			daos.writeLong( contact.getTimeAlive());
			
			DHTTransportContact	t_contact = ((DHTControlContactImpl)contact.getAttachment()).getTransportContact();
			
			try{
									
				t_contact.exportContact( daos );
				
			}catch( DHTTransportException e ){
				
					// shouldn't fail as for a contact to make it to the router 
					// it should be valid...
				
				Debug.printStackTrace( e );
				
				throw( new IOException( e.getMessage()));
			}
		}
		
		daos.flush();
	}
		
	public void
	importState(
		DataInputStream		dais )
		
		throws IOException
	{
		int	num = dais.readInt();
		
		for (int i=0;i<num;i++){
			
			try{
				
				long	time_alive = dais.readLong();
				
				DHTTransportContact	contact = transport.importContact( dais );
								
				imported_state.put( new HashWrapper( contact.getID()), new Object[]{ new Long( time_alive ), contact });
				
			}catch( DHTTransportException e ){
				
				Debug.printStackTrace( e );
			}
		}
	}
	
	public void
	seed(
		final boolean		full_wait )
	{
		final AESemaphore	sem = new AESemaphore( "DHTControl:seed" );
		
		lookup( internal_lookup_pool, false,
				router.getID(), 
				"Seeding DHT",
				(byte)0,
				false, 
				0,
				search_concurrency*4,
				1,
				router.getK(),
				new lookupResultHandler(new DHTOperationAdapter())
				{
					public void
					diversify(
						DHTTransportContact	cause,
						byte				diversification_type )
					{
					}
										
					public void
					closest(
						List		res )
					{
						if ( !full_wait ){
							
							sem.release();
						}
						
						try{
							
							router.seed();
							
						}finally{
							
							if ( full_wait ){
								
								sem.release();
							}
						}
					}
				});
		
			// we always wait at least a minimum amount of time before returning
		
		long	start = SystemTime.getCurrentTime();
		
		sem.reserve( INTEGRATION_TIME_MAX );
		
		long	now = SystemTime.getCurrentTime();
		
		if ( now < start ){
			
			start	= now;
		}
		
		long	remaining = INTEGRATION_TIME_MAX - ( now - start );

		if ( remaining > 500 && !full_wait ){
			
			logger.log( "Initial integration completed, waiting " + remaining + " ms for second phase to start" );
			
			try{
				Thread.sleep( remaining );
				
			}catch( Throwable e ){
				
				Debug.out(e);
			}
		}
	}
	
	protected void
	poke()
	{
		long	now = SystemTime.getCurrentTime();
		
		if ( 	now < last_lookup ||
				now - last_lookup > RANDOM_QUERY_PERIOD ){
			
			last_lookup	= now;
			
				// we don't want this to be blocking as it'll stuff the stats
			
			external_lookup_pool.run(
				new task(external_lookup_pool)
				{
					private byte[]	target = {};
					
					public void
					runSupport()
					{
						target = router.refreshRandom();
					}
					
					public byte[]
					getTarget()
					{
						return( target );
					}
					
					public String
					getDescription()
					{
						return( "Random Query" ); 
					}
				});
		}
	}
	
	public void
	put(
		byte[]					_unencoded_key,
		String					_description,
		byte[]					_value,
		byte					_flags,
		DHTOperationListener	_listener )
	{
			// public entry point for explicit publishes
		
		if ( _value.length == 0 ){
			
				// zero length denotes value removal
			
			throw( new RuntimeException( "zero length values not supported"));
		}
		
		byte[]	encoded_key = encodeKey( _unencoded_key );
		
		DHTLog.log( "put for " + DHTLog.getString( encoded_key ));
		
		DHTDBValue	value = database.store( new HashWrapper( encoded_key ), _value, _flags );
		
		put( 	external_put_pool,
				encoded_key, 
				_description,
				value, 
				0, 
				true,
				new HashSet(),
				_listener instanceof DHTOperationListenerDemuxer?
						(DHTOperationListenerDemuxer)_listener:
						new DHTOperationListenerDemuxer(_listener));		
	}
	
	public void
	putEncodedKey(
		byte[]				encoded_key,
		String				description,
		DHTTransportValue	value,
		long				timeout,
		boolean				original_mappings )
	{
		put( 	internal_put_pool, 
				encoded_key, 
				description, 
				value, 
				timeout, 
				original_mappings,
				new HashSet(),
				new DHTOperationListenerDemuxer( new DHTOperationAdapter()));
	}
	
	
	protected void
	put(
		ThreadPool					thread_pool,
		byte[]						initial_encoded_key,
		String						description,
		DHTTransportValue			value,
		long						timeout,
		boolean						original_mappings,
		Set							keys_written,
		DHTOperationListenerDemuxer	listener )
	{
		put( 	thread_pool, 
				initial_encoded_key, 
				description, 
				new DHTTransportValue[]{ value }, 
				timeout,
				original_mappings,
				keys_written,
				listener );
	}
	
	protected void
	put(
		final ThreadPool					thread_pool,
		final byte[]						initial_encoded_key,
		final String						description,
		final DHTTransportValue[]			values,
		final long							timeout,
		final boolean						original_mappings,
		final Set							keys_written,
		final DHTOperationListenerDemuxer	listener )
	{

			// get the initial starting point for the put - may have previously been diversified
		
		byte[][]	encoded_keys	= 
			adapter.diversify( 
					null, 
					true, 
					true, 
					initial_encoded_key, 
					DHT.DT_NONE, 
					original_mappings );
		
			// may be > 1 if diversification is replicating (for load balancing) 
		
		for (int i=0;i<encoded_keys.length;i++){
			
			final byte[]	encoded_key	= encoded_keys[i];
				
			HashWrapper	hw = new HashWrapper( encoded_key );
			
			if ( keys_written.contains( hw )){
				
				// System.out.println( "put: skipping key as already written" );
				
				continue;
			}
			
			keys_written.add( hw );
			
			final String	this_description = 
				Arrays.equals( encoded_key, initial_encoded_key )?
						description:
						("Diversification of [" + description + "]" );
			
			lookup( thread_pool, false,
					encoded_key,
					this_description,
					(byte)0,
					false, 
					timeout,
					search_concurrency,
					1,
					router.getK(),
					new lookupResultHandler(listener)
					{						
						public void
						diversify(
							DHTTransportContact	cause,
							byte				diversification_type )
						{
							Debug.out( "Shouldn't get a diversify on a lookup-node" );
						}
	
						public void
						closest(
							List				_closest )
						{
							put( 	thread_pool,
									new byte[][]{ encoded_key }, 
									"Store of [" + this_description + "]",
									new DHTTransportValue[][]{ values }, 
									_closest, 
									timeout, 
									listener, 
									true,
									keys_written );		
						}
					});
		}
	}
	
	public void
	putDirectEncodedKeys(
		byte[][]				encoded_keys,
		String					description,
		DHTTransportValue[][]	value_sets,
		List					contacts )
	{
			// we don't consider diversification for direct puts (these are for republishing
			// of cached mappings and we maintain these as normal - its up to the original
			// publisher to diversify as required)
		
		put( 	internal_put_pool,
				encoded_keys, 
				description,
				value_sets, 
				contacts, 
				0, 
				new DHTOperationListenerDemuxer( new DHTOperationAdapter()),
				false,
				new HashSet());
	}
		
	protected void
	put(
		final ThreadPool						thread_pool,
		byte[][]								initial_encoded_keys,
		final String							description,
		final DHTTransportValue[][]				initial_value_sets,
		final List								contacts,
		final long								timeout,
		final DHTOperationListenerDemuxer		listener,
		final boolean							consider_diversification,
		final Set								keys_written )
	{		
		boolean[]	ok = new boolean[initial_encoded_keys.length];
		int	failed = 0;
		
		for (int i=0;i<initial_encoded_keys.length;i++){
			
			if ( ! (ok[i] = !database.isKeyBlocked( initial_encoded_keys[i]))){
				
				failed++;
			}
		}
		
			// if all failed then nothing to do
		
		if ( failed == ok.length ){
			
			listener.incrementCompletes();
			
			listener.complete( false );
			
			return;
		}
		
		final byte[][] 				encoded_keys 	= failed==0?initial_encoded_keys:new byte[ok.length-failed][];
		final DHTTransportValue[][] value_sets 		= failed==0?initial_value_sets:new DHTTransportValue[ok.length-failed][];
		
		if ( failed > 0 ){
			
			int	pos = 0;
			
			for (int i=0;i<ok.length;i++){
				
				if ( ok[i] ){
					
					encoded_keys[ pos ] = initial_encoded_keys[i];
					value_sets[ pos ] 	= initial_value_sets[i];
					
					pos++;
				}
			}
		}
		
			// only diversify on one hit as we're storing at closest 'n' so we only need to
			// do it once for each key
		
		final boolean[]	diversified = new boolean[encoded_keys.length];
		
		for (int i=0;i<contacts.size();i++){
		
			DHTTransportContact	contact = (DHTTransportContact)contacts.get(i);
			
			if ( router.isID( contact.getID())){
					
					// don't send to ourselves!
				
			}else{
				
				try{

					for (int j=0;j<value_sets.length;j++){
							
						for (int k=0;k<value_sets[j].length;k++){
							
							listener.wrote( contact, value_sets[j][k] );
						}
					}
							
						// each store is going to report its complete event
					
					listener.incrementCompletes();
										
					contact.sendStore( 
						new DHTTransportReplyHandlerAdapter()
						{
							public void
							storeReply(
								DHTTransportContact _contact,
								byte[]				_diversifications )
							{
								try{
									DHTLog.log( "Store OK " + DHTLog.getString( _contact ));
																
									router.contactAlive( _contact.getID(), new DHTControlContactImpl(_contact));
								
										// can be null for old protocol versions
									
									if ( consider_diversification && _diversifications != null ){
																		
										for (int j=0;j<_diversifications.length;j++){
											
											if ( _diversifications[j] != DHT.DT_NONE && !diversified[j] ){
												

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -