⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 vivaldiv2positionprovider.java

📁 这是一个基于java编写的torrent的P2P源码
💻 JAVA
字号:
/*
 * Created on 24-Apr-2006 Created by Paul Gardner Copyright (C) 2006 Aelitis,
 * All Rights Reserved.
 * 
 * This program is free software; you can redistribute it and/or modify it under
 * the terms of the GNU General Public License as published by the Free Software
 * Foundation; either version 2 of the License, or (at your option) any later
 * version. This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
 * details. You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software Foundation, Inc.,
 * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 * 
 * AELITIS, SAS au capital de 46,603.30 euros 8 Allee Lenotre, La Grille Royale,
 * 78600 Le Mesnil le Roi, France.
 * 
 */

package com.aelitis.azureus.vivaldi.ver2;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.SortedSet;
import java.util.TreeSet;

import org.gudy.azureus2.core3.util.Timer;
import org.gudy.azureus2.core3.util.TimerEvent;
import org.gudy.azureus2.core3.util.TimerEventPerformer;

import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPosition;
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPositionManager;
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPositionProvider;
import com.aelitis.azureus.core.dht.netcoords.DHTNetworkPositionProviderInstance;
import com.aelitis.azureus.core.dht.router.DHTRouter;
import com.aelitis.azureus.core.dht.router.DHTRouterContact;
import com.aelitis.azureus.core.dht.router.DHTRouterFactory;
import com.aelitis.azureus.core.dht.router.DHTRouterFactoryObserver;
import com.aelitis.azureus.core.dht.router.DHTRouterObserver;
import com.aelitis.azureus.vivaldi.ver2.stats.SerializationController;
import com.aelitis.azureus.vivaldi.ver2.stats.V1Serializer;

import edu.harvard.syrah.nc.Coordinate;
import edu.harvard.syrah.nc.VivaldiClient;

public class VivaldiV2PositionProvider implements DHTNetworkPositionProvider,
		DHTRouterFactoryObserver, DHTRouterObserver {
	public static final int NUM_DIMS = 5;
	public static final int TRANSIENT_TIME = 45 * 1000;		// in milliseconds
  public static final long MIN_NC_UPDATE_INTERVAL = 10*1000;
  
	private static final boolean LOGGING_ENABLED = false;

	private static boolean initialised = false;
	private static volatile DHTNetworkPositionProviderInstance provider = null;

	private final VivaldiClient<IDWrapper> vc;
	private final InitialPosition ip;
	
	protected final SortedSet<IDWrapper> transient_ids;
	protected final LinkedList<TransientTuple> transient_list;
	
	protected final SerializationController serializer;
	protected final SortedSet<IDWrapper> router_entries;
  protected DHTRouter router = null;
  protected long last_nc_update = 0;
  
  private boolean	started_up = false;
  
	public static synchronized void initialise() {
		if (!initialised) {

			initialised = true;

			provider = DHTNetworkPositionManager
					.registerProvider(new VivaldiV2PositionProvider());

			doLog("Vivaldi V2 position provider created");
		}
	}

	protected VivaldiV2PositionProvider() {
		DHTRouterFactory.addObserver(this);

		vc = new VivaldiClient<IDWrapper>(NUM_DIMS);
		ip = new InitialPosition();
		
		transient_ids = new TreeSet<IDWrapper>();
		transient_list = new LinkedList<TransientTuple>();
		
		serializer = new SerializationController();
		router_entries = new TreeSet<IDWrapper>();
    

    Timer timer = new Timer("VivaldiV2PositionProvider:ping");
    
    timer.addPeriodicEvent(
      MIN_NC_UPDATE_INTERVAL,
      new TimerEventPerformer()
      {
        public void
        perform(
          TimerEvent  event )
        {
          ping();
        
        }
      });

	}

  protected void resetPingClock (long curr_time) {
    last_nc_update = curr_time;
  }
  
  protected void ping () {
    long curr_time = System.currentTimeMillis();
    if (curr_time > last_nc_update + MIN_NC_UPDATE_INTERVAL) {
	//System.out.println ("starting v2position ping");
      if (router != null) {
        IDWrapper id = vc.getNeighborToPing(curr_time);
        if (id != null) {
          byte[] raw_id = id.getRawId();
          router.requestPing(raw_id);
          //System.out.println ("pinging "+id);
        } else {
	    //System.out.println ("vc neighbor to ping is null");
        }
      } else {
	  //System.out.println ("router is null");  
      }
    } else {
	//System.out.println ("supressing v2position ping");
    }
  }
    
	public byte getPositionType() {
		return (DHTNetworkPosition.POSITION_TYPE_VIVALDI_V2 );
	}

	public DHTNetworkPosition create(byte[] ID, boolean is_local) {
		if (is_local) {
			doLog("Returning position for local peer");
			
			return new LocalPosition(this);
		}

		purgeTransient(System.currentTimeMillis());

		// return the shared initial position of a remote peer
		return ip;
	}

	public DHTNetworkPosition deserialisePosition(DataInputStream is)
			throws IOException {
		return new RemotePosition(is);
	}

	public void
	serialiseStats(
		DataOutputStream	os )
	
		throws IOException
	{
		if (!serializer.contains(V1Serializer.VER_01)) {
			serializer.addSerializer(V1Serializer.getInstance());
		}
		serializer.toSerialized(V1Serializer.VER_01, os, vc);
	}
	
	public void routerCreated(DHTRouter _router) {
		doLog("Vivaldi notified of created router");
		
		_router.addObserver(this);
    router = _router;
	}

	public synchronized void added(DHTRouterContact contact) {
		purgeTransient(System.currentTimeMillis());
    
		// add to set of router entries
		IDWrapper id = new IDWrapper(contact.getID());
		if (!router_entries.contains(id)) {
			router_entries.add(id);
			if ( LOGGING_ENABLED ){
				doLog("added router entry " + id + " " + getStats());
			}
		}
		
		if (transient_ids.remove(id)) {
			// was a transient entry, get its latest position and sample
			TransientTuple tt = null;
			for (Iterator<TransientTuple> i = transient_list.iterator(); i.hasNext(); ) {
				tt = i.next();
				if (tt.id.equals(id)) {
					i.remove();
					break;
				}
			}
			
			// add state to the vivaldi client
			long curr_time = System.currentTimeMillis();
			// add the time this guy has been sitting as a TransientTuple to its age
			long tt_age = curr_time - tt.create_time + tt.last_pos.getAge();
			if (vc.processSample(id, tt.last_pos.getCoords(), tt.last_pos.getError(), tt.last_rtt, tt_age,
					curr_time, true)) {
			  resetPingClock(curr_time);
      }

			if ( LOGGING_ENABLED ){
				doLog(id + "added to router, promoting transient to host " + getStats());
			}
		}
		else if (vc.getHosts().contains(id)) {
			if ( LOGGING_ENABLED ){
				doLog(id + " added to router, but was already found as a host");
			}
		}
		else {
			if ( LOGGING_ENABLED ){
				doLog(id + " added to router, but was not found as a host or transient");
			}
		}
	}

	public synchronized void removed(DHTRouterContact contact) {
		purgeTransient(System.currentTimeMillis());

		// remove from router entries
		IDWrapper id = new IDWrapper(contact.getID());
		if (router_entries.remove(id)) {
			if ( LOGGING_ENABLED ){
				doLog("removed router entry " + id + " " + getStats());
			}
		}
		
		if (vc.removeHost(id)) {
			if ( LOGGING_ENABLED ){
				doLog(id + " removed from router, removed as host " + getStats());
			}
		}
		else if (transient_ids.remove(id)) {
			// contact identifier is present in list, so find and remove it
			for (Iterator<TransientTuple> i = transient_list.iterator(); i.hasNext(); ) {
				TransientTuple tt = i.next();
				if (tt.id.equals(id)) {
					i.remove();
					break;
				}
			}
	
			if ( LOGGING_ENABLED ){
				doLog(id + " removed from router, removed as transient " + getStats());
			}
		}
		else {
			if ( LOGGING_ENABLED ){
				doLog(id + " removed from router, but was not found as a host or transient");
			}
		}
	}

	public void locationChanged(DHTRouterContact contact) {
		purgeTransient(System.currentTimeMillis());
		
		// only remove from VivaldiClient when removed from routing table
	}

	public void nowAlive(DHTRouterContact contact) {
		purgeTransient(System.currentTimeMillis());
		
		// only add to VivaldiClient when coordinates are updated
	}

	public void nowFailing(DHTRouterContact contact) {
		purgeTransient(System.currentTimeMillis());
		
		// only remove from VivaldiClient when removed from routing table
	}

	public void destroyed(DHTRouter router) {
		doLog("Vivaldi notified of destroyed router");
		
		router.removeObserver(this);
		
		vc.reset();
		router_entries.clear();

		transient_ids.clear();
		transient_list.clear();
	}
	
	public DHTNetworkPosition
	getLocalPosition()
	{
		if ( started_up ){
			
			return( new StableLocalPosition( this ));
		}
		
			// we only have a stable local position if we've been started up - i.e things are running
		
		return( null );
	}
	
	/*
	 * Pass-thru methods.
	 */
	
	protected Coordinate getCoords() {
		return vc.getSystemCoords();
	}

	protected Coordinate getStableCoords() {
		return vc.getApplicationCoords();
	}
	
  protected float getError() {
    return (float) vc.getSystemError();
  }

  protected long getAge() {
    long age = vc.getAge(System.currentTimeMillis());
    //System.out.println("age "+age);
    return age;
  }
  
	protected InitialPosition getInitialPosition() {
		return ip;
	}
	
	protected VivaldiClient<IDWrapper> getVivaldiClient() {
		return vc;
	}
	
	/*
	 * Methods for maintaining the potential adds to the VivaldiClient.
	 */
	
	protected synchronized void update(LocalPosition local_pos, IDWrapper id, SyrahPosition sp, float sample_rtt) {
		if (sp == local_pos) {
			doLog("update invoked on LocalPosition with itself, ID = " + id);
			return;
		}
		else if (sp == ip) {
			doLog("update invoked with the InitialPosition singleton, ID = " + id);
			return;
		}
		
		long curr_time = System.currentTimeMillis();
		if (vc.getHosts().contains(id)) {
			// already maintain state for this peer, so process sample now
			if (vc.processSample(id, sp.getCoords(), sp.getError(), sample_rtt, sp.getAge(), curr_time, false)) {
        resetPingClock(curr_time);
      }
			
			doLog("update called on host, ID = " + id);
			return;
		}
		else if (router_entries.contains(id)) {
			if (transient_ids.remove(id)) {
				// contact identifier is present in list, so find and remove it
				for (Iterator<TransientTuple> i = transient_list.iterator(); i.hasNext(); ) {
					TransientTuple tt = i.next();
					if (tt.id.equals(id)) {
						i.remove();
						break;
					}
				}
			}
			
			// peer is already in the router, so add and process sample now
			if (vc.processSample(id, sp.getCoords(), sp.getError(), sample_rtt, sp.getAge(), curr_time, true)) {
			  resetPingClock (curr_time);
      }
			
			doLog("update called on host already in router, ID = " + id);
			return;
		}
		
		final long new_remove_time = curr_time + TRANSIENT_TIME;
		if (!transient_ids.contains(id)) {
			// identifier is not in list yet, so add it and return
			transient_ids.add(id);
			transient_list.addLast(new TransientTuple(new_remove_time, id, sp, sample_rtt, curr_time));
			doLog("added transient " + id + " " + getStats());
			return;
		}
		
		doLog("updating transient " + id + " " + getStats());
		
		// find the entry belonging to the identifier in the list
		TransientTuple tt = null;
		for (Iterator<TransientTuple> i = transient_list.iterator(); i.hasNext(); ) {
			tt = i.next();
			if (tt.id.equals(id)) {
				// once found, remove it
				i.remove();
				break;
			}
		}
		
		// update the entry and add it to the end
		tt.remove_time = new_remove_time;
		tt.last_pos = sp;
		tt.last_rtt = sample_rtt;
		transient_list.addLast(tt);
	}

	protected synchronized void purgeTransient(long curr_time) {
		for (Iterator<TransientTuple> i = transient_list.iterator(); i.hasNext(); ) {
			TransientTuple tt = i.next();
			if (tt.remove_time > curr_time) {
				// entry expires in the future, so end now
				return;
			}
			
			i.remove();
			transient_ids.remove(tt.id);
			doLog("removed transient " + tt.id + " " + getStats());
		}
	}
	
	protected String getStats() {
		return "[h:" + vc.getHosts().size() + ", t:" + transient_list.size() + " re:" + router_entries.size() + "]";
	}
	
	protected static void doLog(String str) {
    //System.out.println (str);
		if (LOGGING_ENABLED && (provider != null)) {
			provider.log(str);
		}
	}
	
	public void
	startUp(
		DataInputStream		is ) //throws IOException
	{
		started_up	= true;
		
		try{
			vc.startUp (is);
			
		}catch( IOException e ){
			
			doLog("startUp failed:" + e.toString());
		}
	}
	
	public void
	shutDown(
		DataOutputStream	os )// throws IOException
	{	
		try{
			vc.shutDown (os);
			
		}catch( IOException e ){
			
			doLog("shutDown failed:" + e.toString());
		}
	}
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -