⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 btpeer.java

📁 p2p仿真
💻 JAVA
字号:
/*
 * @(#)BTPeer.java	ver 1.2  6/20/2005
 *
 * Copyright 2005 Weishuai Yang (wyang@cs.binghamton.edu). 
 * All rights reserved.
 * 
 */

package gps.protocol.BT;
import java.awt.Color;
import java.awt.Graphics;
import java.util.ArrayList;
import java.util.LinkedHashMap;

import gps.event.SimEvent;
import gps.event.SimEventHandler;
import gps.protocol.Document;
import gps.protocol.Peer;
import gps.protocol.BT.algorithm.BTAlgorithmPeerSelectionAtPeer;
import gps.protocol.BT.param.BTPeerMessage;
import gps.protocol.BT.param.BTTorrent;
import gps.util.LogFormatter;

/**
 * BT Peer
 * 
 * @author  Weishuai Yang
 * @version 1.2,  6/20/2005
 */
public class BTPeer extends Peer {
    
    /**
     * maximum allowed session number on this peer
     */
	public static final int MAX_ALLOWED_SESSION_NUM=10;
	
	/**
	 * maximum allowed connection number on this peer
	 */
	public static final int MAX_ALLOWED_CONNECTION_NUM=80;
	
	/**
	 * maximum out bandwidth, not used now
	 */
	public static final double MAX_OUT_BANDWIDTH_PER_NODE=300000;
	
	/**
	 * current sessions indexed by document hash key
	 */
	private LinkedHashMap mSessions=null;
	
	/**
	 * current next connections to be unchoked
	 */
	private LinkedHashMap mUnchokeNext=null;
	
		
	/**
	 * peer selection algorithm
	 */
	public BTAlgorithmPeerSelectionAtPeer mPeerSelection=new BTAlgorithmPeerSelectionAtPeer(getID());
	

	/**
	 * constructs a new BTPeer with id provided
	 * @param i BTPeer id
	 */
	public BTPeer(int i) {
		super(i);
		mSessions=new LinkedHashMap();
	}	

	/**
	 * reset internal values
	 */
	public void reset(){
		super.reset();
		if(mSessions!=null)
		mSessions.clear();
		mUnchokeNext=null;
		mPeerSelection=new BTAlgorithmPeerSelectionAtPeer(getID());
	}
	/**
	 * do some BTPeer specific drawing on the graph	 
	 * @param g Graphics object
	 * @param x x position
	 * @param y y position 
	 */
	public void agentDraw(Graphics g, int x, int y){
			g.setColor( Color.green );
			g.fillOval( x-5, y-5, 10, 10 );

			g.drawString("Peer("+mAgentID+")",x-16,y-6 );
			
			if(!mDocDBbyKey.isEmpty()){
			    StringBuffer sb=new StringBuffer();
			    ArrayList keylist=new ArrayList(mDocDBbyKey.keySet());
			    for(int i=0;i<keylist.size();i++){
			        BTDocument btd=(BTDocument)mDocDBbyKey.get(keylist.get(i));
			        sb.append((String)keylist.get(i));
			        if(!btd.isWhole()){
			            sb.append("[");
			            sb.append(LogFormatter.sprintf("%.1f%",btd.getFinishedPercent()*100));
			            sb.append("]");
			        }
			        sb.append(" ");
			    }
			    g.setColor( Color.black );
			    g.drawString(sb.toString(), x-12, y+12);
			}
		
	}
	
	/**
	 * gets all sessions on this peer
	 * @return hash map of sessions
	 */
	public LinkedHashMap getSessions(){
		return mSessions;
	}
	/**
	 * gets session for a specific document
	 * @param key document hash key
	 * @return session object
	 */
	public BTSession getSession(String key){
	    return (BTSession)mSessions.get(key);
	}
	/**
	 * removes session for a specific document
	 * @param key document hash key
	 * @return the removed session object
	 */
	public BTSession removeSession(String key){
	    return (BTSession)mSessions.remove(key);
	}
	/**
	 * retrieves a document by key
	 * 
	 * @param key document hash key
	 * @return BTDocument object
	 */	
	public BTDocument getBTDocument(String key){	
		return (BTDocument)mDocDBbyKey.get(key);	
	}

	/**
	 * handles peer level events
	 *
	 * @param e the event to be handled
	 * @return true if already handled
	 * 
	 */
	public boolean handle(SimEvent e){
		if(super.handle(e))
		    return true;

		switch(e.getType()){

			//case BTEvent.P_TRACKER_RESPONSE: handleTrackerResponse((BTEvent)e);

			/** 
			 * in the following three events handler, handlePublish, handleDownload 
			 * and handleRandomDownload, nodes are identified by their ID, since 
			 * those are events usually from conviguration file. All other events identified 
			 * nodes by the reference to the instance.
			 */
			case BTEvent.USER_PUBLISH: 
				handlePublish((BTEvent)e);
				return true;
			case BTEvent.USER_DOWNLOAD: 
				handleDownload((BTEvent)e);
				return true;
			case BTEvent.USER_RANDOM_DOWNLOAD: 
				handleRandomDownload((BTEvent)e);
				return true;
			//BTPeer handles peer message, but only limited to handshaking
			
			case BTEvent.PEER_MESSAGE: 
				handlePeerMessage((BTEvent)e);
				return true;
			default:
				mDebugLog.warning("received event "+e+" unprocessed");
		}
		return false;
	}


	 
	/**
	 * handles events for publishing a specific
	 * ducument to a specified tracker at a specific time. 
	 *
	 * @param e the event object
	 */
	public void handlePublish(BTEvent e){
	    int trackerID=((Integer)e.getParam()).intValue();
		String docHashKey=(String)e.getAddParam();
		publish(docHashKey, (BTTracker)BT.getInstance().getAgent(trackerID));
	}
	
	
	/**
	 * handles events for downloading a specific
	 * ducument from a specified tracker at a specific time. 
	 *
	 * @param e the event object
	 */
	public void handleDownload(BTEvent e){
		
	    int trackerID=((Integer)e.getParam()).intValue();
		String docHashKey=(String)e.getAddParam();
		double t=BT.getInstance().getDelayAgent(getID(),trackerID);
		
		BTTracker tracker=(BTTracker)BT.getInstance().getAgent(trackerID);
		//download a torrent metainfo file from web pages
		BTTorrent btt=tracker.getTorrent(docHashKey);
		
		BTSession session=null;
		if(mSessions.containsKey(btt.mDocHashKey)){
		    session=(BTSession)mSessions.get(btt.mDocHashKey);
		    session.setTorrent(btt);
		}
		else { 
			session=new BTSession(btt, this);
			mSessions.put(btt.mDocHashKey,session);
		}
		session.queryTracker();	
		mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+this+" --      Download   -- "+btt.mDocHashKey+" -- "+tracker+" -- Starts");
	}
	
	/**
	 * handles events for downloading a random
	 * ducument from a specified tracker at a specific time. 
	 *
	 * @param e the event object
	 */
	public void handleRandomDownload(BTEvent e){
	    int trackerID=((Integer)e.getParam()).intValue();
		double t=BT.getInstance().getDelayAgent(getID(),trackerID);

		BTTracker tracker=(BTTracker)BT.getInstance().getAgent(trackerID);
		//download a random torrent metainfo file from web pages
		BTTorrent btt=tracker.getRandomTorrent(mDocDBbyKey);
		
		if(btt==null) {
			mDebugLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+tracker+" can't find a random document for "+this+" to download");	
			return;
		}
		BTSession session=null;
		if(mSessions.containsKey(btt.mDocHashKey)){
		    session=(BTSession)mSessions.get(btt.mDocHashKey);
		    session.setTorrent(btt);
		}
		else { 
			session=new BTSession(btt, this);
			mSessions.put(btt.mDocHashKey,session);
		}
		
		session.queryTracker();
		mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+this+" -- Random-Download -- "+btt.mDocHashKey+" -- "+tracker+" -- Starts");
	}


	/**
	 * handles HAND_SHAKING, all other peer messages are not supposed to be sent to BTPeer.
	 * They are handled by BTSession and BTConnection
	 * 
	 * @param e is the event object
	 */
	public void handlePeerMessage(BTEvent e){
		
		BTSession bts=(BTSession)e.getParam();
		
		BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
		
		/*
		mDebugLog.info(mScheduler.getCurrent()+" "+this+"receiving peer message from " + 
						bts.getAgent()+" for "+btpm.mDocHashKey+" Type: "+
						btpm.messageToString());
		*/
		
		//peer object only processes hand shaking, other type peer messages are processed by session object									
		if(btpm.mType!=BTPeerMessage.HAND_SHAKING)
			return;
		
		handleHandShaking(e);
	}
	/**
	 * handleHandShaking makes sure each side a session. The initiator should 
	 * already has a session, but the other side may or may not has 
	 * one yet. if the receiver doesn't have one, set up one for it, 
	 * and response a bitfield to the initiator's session.
	 * for simplicity, handshake already includes bitfield in the message,
	 * and the receiver of handshake replys a bitfield message, and 
	 * set up the connection. The initiator also set up connection after 
	 * receive the bitfield.
	 *
	 * @param e is the event object
	 */
	public void handleHandShaking(BTEvent e){
		
	    
		BTSession bts=(BTSession)e.getParam();
		
		BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
		
		mDebugLog.info(mScheduler.getCurrent()+": "+toString()+"received handshaking from "+bts.getAgent());
		
		BTDocument btd=getBTDocument(btpm.mDocHashKey);
		if(btd==null){
			//if this node doesn't have this document, do nothing
			return;
		}
		else{
			BTSession session=null;
			//if this peer doesn't have a session downloading or uploading that document
			if(!mSessions.containsKey(btpm.mDocHashKey)){
				//if this peer already has too many session, and doesn't want others download from it
				if(mSessions.size()>=MAX_ALLOWED_SESSION_NUM)
					return;
				
				//no current session for this file, no matter a complete or partial file, new a session, and serve as a seed.
				session=new BTSession(btpm.mDocHashKey, this);
				mSessions.put(btpm.mDocHashKey, session);
			}
			else
				session=(BTSession)mSessions.get(btpm.mDocHashKey);
			
			boolean interested=false;
			
			//retrieve bitfiled informaiton into session knowledge
			//XXX
			for(int i=0;i<btpm.mBitfield.length;i++){
				if(btpm.mBitfield[i]){
					if(btd.mPieces[i]==false){
						//that node has this piece, but this node doesn't have
						interested=true;
					}
					session.addNodeHasPiece(bts.getAgent(), i);
				}
			}
			
			
			//set up a connection			
			/**
			 * note: for simplicity, each connection only serve one document, 
			 * so it's possible that there are more than one connections between 2 nodes.
			 */
			 
			//check whether already has a connection to that node for this document
			BTSocket connection=null;
			if(!session.getConnections().containsKey(bts.getAgent())){
				
				//if this peer already has too many session, and doesn't want others download from it
				if(session.getConnections().size()>=MAX_ALLOWED_CONNECTION_NUM)
					return;
				
				mDebugLog.info(mScheduler.getCurrent()+": "+toString()+" creating a new connection to "+bts.getAgent());

				//no current session for this file, no matter a complete or partial file, new a session, and serve as a seed.
				connection=new BTSocket(session);
				session.getConnections().put(bts.getAgent(), connection);
			
				//set up the timeout event
				BTEvent bte=new BTEvent(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT, BTEvent.CONNECTION_TIMEOUT, session, bts.getAgent());
				mScheduler.enqueue(bte);
				connection.mTimeout=bte;
				//mDebugLog.info(mScheduler.getCurrent()+": "+toString()+" connection timeout set up to " +(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT)+" for connection to "+bts.getAgent());
			}
			else{
				connection=(BTSocket)session.getConnections().get(bts.getAgent());
			    mScheduler.cancel(connection.mTimeout);
			    connection.mTimeout.setTimeStamp(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT);
			    mScheduler.enqueue(connection.mTimeout);
				//mDebugLog.info(mScheduler.getCurrent()+": "+toString()+" connection timeout updated0 to " +(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT)+" for connection to "+bts.getAgent());
			
				if(connection.mKeepAlive!=null)
				mScheduler.cancel(connection.mKeepAlive);
			
			}
			
			
			//set up the keep alive event, to be processed by the connection, when triggered, send a keep alive message
			BTEvent bte2=new BTEvent(mScheduler.getCurrent()+BTSocket.CONNECTION_KEEP_ALIVE, BTEvent.CONNECTION_KEEP_ALIVE, connection, connection);
			mScheduler.enqueue(bte2);
			connection.mKeepAlive=bte2;
		
		
			connection.mBitField=btpm.mBitfield;
			
			//response a bitfield message
			BTPeerMessage btpm2=new BTPeerMessage(BTPeerMessage.BITFIELD);
			btpm2.mDocHashKey=btpm.mDocHashKey;
			
			btpm2.mBitfield=(boolean[])session.getDocument().getPieces().clone();
			btpm2.mInterested=interested;
			
			//this is only used in the bitfiled message to set up connections to each other
			btpm2.mConnection=connection;
			connection.mAmChoking=true;
			
			double t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(this,bts.getAgent());
			BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, (SimEventHandler)bts, session);
			
			bte.setAddParam(btpm2);
			mScheduler.enqueue(bte);
		}		
	}


	/**
	 * publishing the document to tracker 
	 *
	 * @param docHashkey the hashkey of the document
	 * @param tracker the tracker instance
	 */
	public void publish(String docHashkey, BTTracker tracker){
		//publish actually is web posting to web server, which is out of the scope of BT
		tracker.addTorrent(new BTTorrent((BTDocument)Document.mDocumentListbyKey.get(docHashkey), tracker));
		mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+this+" --      Publish    -- "+docHashkey+" -- "+tracker);

	}
	/**
	 * gets string description
	 * @return string description
	 */
	public String toString(){
		return "BTPeer("+mAgentID+")";
	}
	
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -