⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 btsession.java

📁 p2p仿真
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
			//make sure no duplication
			mPeerCandidateList.remove(bts.getAgent());
			mPeerCandidateList.add(bts.getAgent());
		}
		//only interested peers will be taken into consideration for unchoking
	}

	/**
	 * handles the notInterested type peer message
	 *
	 * @param e the event object
	 */
	public void handlePMNotInterested(BTEvent e){
		BTSession bts=(BTSession)e.getParam();
		BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
		BTSocket connection=(BTSocket)mConnections.get(bts.getAgent());
		connection.mPeerInterested=false;
		mPeerCandidateList.remove(bts.getAgent());
	}
	/**
	 * handles the cancel type peer message
	 *
	 * @param e the event object
	 */
	public void handlePMCancel(BTEvent e){
	    //not implemented yet
	}
	
	


	/**
	 * the entry point of this session, 
	 * after the session is initionalized, this function is called 
	 * to start the communication with tracker and peers
	 */
	public void queryTracker(){
		//format of dochashkey is key_xxx
		BTTracker tracker=mTorrent.mTracker;
		double t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent, tracker);
		BTEvent bte=new BTEvent(t, BTEvent.TRACKER_GET_REQUEST, tracker, mAgent);
		
		//request is based on the document hash key rather than the piece hash key
		BTGetRequest btgr=new BTGetRequest(mDocument.getKey(), BTGetRequest.STARTED, mDocument.getLeftSize(), this);
		
		//BTGetRequest as the additional parameter
		bte.setAddParam(btgr);	
		mScheduler.enqueue(bte);
	}


	/**
	 * sends announcement to tracker
	 */
	public void announceTracker(){
		BTTracker tracker=mTorrent.mTracker;
		BTGetRequest btgr;
		double t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent, tracker);
		BTEvent bte=new BTEvent(t, BTEvent.TRACKER_GET_REQUEST, tracker, mAgent);
		double leftSize=mDocument.getLeftSize();
		
		//request is based on the document hash key rather than the piece hash key
		switch(mStatus){
			case WORKING:
				//0 in event field means the event param is not present, i.e. regular announcement
				btgr=new BTGetRequest(mDocument.getKey(), 0, leftSize, this);
				mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+mAgent+" --     Announce    -- "+mDocument.getKey()+" -- "+tracker+" -- Left "+leftSize);
				break;
			case COMPLETED:
				btgr=new BTGetRequest(mDocument.getKey(), BTGetRequest.COMPLETED, 0, this);
				mScheduler.cancel(mNextTrackerAnnouncement);
				mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+mAgent+" --     Announce    -- "+mDocument.getKey()+" -- "+tracker+" -- Completed");
				break;
			case STOPPED:
				btgr=new BTGetRequest(mDocument.getKey(), BTGetRequest.STOPPED, leftSize, this);
				mScheduler.cancel(mNextTrackerAnnouncement);
				mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+mAgent+" --     Announce    -- "+mDocument.getKey()+" -- "+tracker+" -- Stopped");
				break;
			case FAILED:
			    mScheduler.cancel(mNextTrackerAnnouncement);
				mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+mAgent+" --     Announce    -- "+mDocument.getKey()+" -- "+tracker+" -- Failed");
			    return;
			default:
				mDebugLog.warning("an announcement to tracker is not supposed to be scheduled."); 
				return;
		}
		//BTGetRequest as the additional parameter
		bte.setAddParam(btgr);	
		mScheduler.enqueue(bte);
	}

	/**
	 * let all the connected peers know that 
	 * this node finishes downloading a piece.
	 * @param index piece index
	 */
	public void announcePeer(int index){
		//send have message to all the connected peers in this session
		String key=mDocument.getKey();		
		BTPeerMessage btpm=new BTPeerMessage(BTPeerMessage.HAVE);
		btpm.mDocHashKey=key;
		btpm.mIndex=index;

		double t=0;

		//for all the nodes in the connection list, add this node to there knowledge[piece]

		ArrayList nodeList=new ArrayList(mConnections.keySet());
		//the announcement is to be process by BTSession object rather than BTPeer object
		for(int i=0;i<nodeList.size();i++){
		    BTPeer p=(BTPeer)(nodeList.get(i));
		    BTSession bts=p.getSession(key);
			t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent,p);
			BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, (SimEventHandler)bts, this);
			bte.setAddParam(btpm);
			mScheduler.enqueue(bte);
			
			if(mDocument.isWhole()){
			    sendInterested(false, p);
			}
		}
	}
	

	/**
	 * set up a connection to a peer
	 * @param p destination peer
	 * @param chokedornot initially choked or not
	 */
	public void connectTo(BTPeer p, boolean chokedornot){
			
	    //not supposed to connect itself
	    if(p==mAgent) return;
	    
		//if it's already connected, just skip
		if(mConnections.containsKey(p)){
			BTSocket conn=getConnection(p);
			if(conn.mAmChoking!=chokedornot){
			    conn.mAmChoking=chokedornot;
			    sendChoked(chokedornot, p);
			}
			return;
		}
		//if not choked, add it to unchoked list, so that when the connection 
		//is really created, it can send unchoked
		else if(!chokedornot){
				mUnchokedList.add(p);
		}
		
		BTPeerMessage btpm=new BTPeerMessage(BTPeerMessage.HAND_SHAKING);
		btpm.mDocHashKey=getDocument().getKey();
		//for simplicity, also includes bitfiled informaition
		btpm.mBitfield=(boolean[])getDocument().getPieces().clone();

		//by default, set it as interested
		btpm.mInterested=true;
		
		double t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent,p);
		
		//PEER_MESSAGE events use BTSession as the first param object, which is different from other events
		BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, p, this);
		bte.setAddParam(btpm);
		
		mScheduler.enqueue(bte);
	}
	
	/**
	 * close connection to peer
	 * @param p destination peer
	 */
	public void closeConnection(BTPeer p){

		if(mConnections.containsKey(p)){
		    
		    BTSocket conn=getConnection(p);
			BTSocket counterpart=conn.mCounterpart;

		    //make sure all the arranged events are cancelled
		    mScheduler.cancel(conn.mTimeout);
		    mScheduler.cancel(conn.mRequestTimeout);
		    mScheduler.cancel(conn.mEstimateFinish);
		    mScheduler.cancel(conn.mKeepAlive);
		    mScheduler.cancel(conn.mKeepAliveMsg);
		    conn.mTimeout=null;
		    conn.mRequestTimeout=null;
		    conn.mEstimateFinish=null;
		    conn.mKeepAlive=null;
		    conn.mKeepAliveMsg=null;
		    conn.mBitField=null;
		    
		    if(conn.mConnection!=null){
		    	conn.mConnection.tearDown();
	    		//update link information on screen
				ProtocolPanel pp=Simulator.getInstance().getPP();
				if(pp!=null){
				    pp.getSimGuiControl().getGraphPanel().repaint();
				}
		    
		    
		    }
			conn.mConnection=null;

			conn.mCounterpart=null;
		    mConnections.remove(p);

		    if(counterpart!=null){
		    	counterpart.mConnection=null;
		    	counterpart.mSession.closeConnection(getAgent());
		    }
		    
		}
		mUnchokedList.remove(p);
	    if(mConnections.size()==0){
	        mRemoved=true;
	        mAgent.removeSession(mDocument.getKey());
	    }
	}

	/** 
	 * send a piece request message to a peer
	 * @param p destination peer
	 */
	public void sendRequestTo(BTPeer p){
		
		BTSocket btc=(BTSocket)mConnections.get(p);
			
		//if I'm unchoked by peer and currently downloading, don't do anything
		if(!btc.mPeerChoking){
			//since I'm unchoked, interested should already been set.
			//if not whole and not requested yet send a request
			if(!getDocument().isWhole()&&(btc.mRequestTimeout==null)){
				
				
				if(btc.mRequestingPieceIndex==-1){
				    int index=mPieceSelection.selectPiece(this, p);
					//send a request
				    if(index>=0){
				        //mDebugLog.fine(mScheduler.getCurrent()+": "+mAgent+" sends request for piece "+index+" to "+p+" after receive ");
				        sendRequest(index, p);
				        return;
				    }
				    else {
				    	//if index less than 0, means selection failed
				    	//mDebugLog.fine(mScheduler.getCurrent()+": "+mAgent+" not sending request to "+p+", since index= "+index+",no piece is suitable to choose");
				    	btc.mConnection.setInActive();
				    	return;
				    }
			    }
			    else sendRequest(btc.mRequestingPieceIndex, p);
			}
		}

	}
	
	/**
	 * send request for a specific piece to a peer
	 * @param index piece index
	 * @param p destination peer
	 */
	public void sendRequest(int index, BTPeer p){

		BTDocument btd=(BTDocument)getDocument();
		BTPeerMessage btpm2=new BTPeerMessage(BTPeerMessage.REQUEST);
		
		BTSocket btc=(BTSocket)getConnection(p);
		
		if(btc.mRequestTimeout!=null){
			mDebugLog.warning(""+mScheduler.getCurrent()+" "+mAgent+" not supposed to send request to "+p+" before previous request timeout");
			return;
		}

		btc.mRequestingPieceIndex=index;
		
		btpm2.mDocHashKey=btd.getKey();
		btpm2.mIndex=index;
		
		btd.addPieceDownloading(index);
		
		if(btd.inPartialList(index)){
		    btpm2.mBegin=btd.getPieceProgress(index);
		}
		else{
		    btpm2.mBegin=0;
		    btd.pieceProgress(index, 0);
		}
		
	    if(btd.getPieceLength(index)-btpm2.mBegin<btd.getBlockLength()){
	        btpm2.mLength=(int)(btd.getPieceLength(index)-btpm2.mBegin);
	    }
	    else 
	        btpm2.mLength=(int)btd.getBlockLength();
		
		//to simulate block pipeline, if the conection is currently transfering, omit the network delay for seinding request, and piece message
		double t=0;
		boolean pipe=Boolean.parseBoolean(BT.getInstance().getProtocolProperties().getProperty("Pipeline"));
		if(pipe)
		{
			t=mScheduler.getCurrent()+0.0000000001;
			if(!btc.mConnection.isActive())
				t+=BT.getInstance().getDelayAgent(mAgent,p);
		}
		else{
			t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent,p);
		}
		//PEER_MESSAGE events use BTSession as the first param object, which is different from other events
		BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, p.getSession(btd.getKey()), this);
		bte.setAddParam(btpm2);
		
		mScheduler.enqueue(bte);
		
		t+=BTSocket.REQUEST_TIMEOUT;
		BTEvent bte2=new BTEvent(t, BTEvent.REQUEST_TIMEOUT, btc, this);
		bte2.setAddParam(p);
		btc.mRequestTimeout=bte2;
		mScheduler.enqueue(bte2);
	}

	/**
	 * send interested type peer message
	 * @param interested interested or uninterested
	 * @param p destination peer
	 */
	public void sendInterested(boolean interested, BTPeer p){
	
		BTDocument btd=(BTDocument)getDocument();
		BTPeerMessage btpm2=null;
		if(interested){
		    btpm2=new BTPeerMessage(BTPeerMessage.INTERESTED);
		}
		else btpm2=new BTPeerMessage(BTPeerMessage.NOT_INTERESTED);
		btpm2.mDocHashKey=btd.getKey();
	
		double t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent,p);
		
		//PEER_MESSAGE events use BTSession as the first param object, which is different from other events
		BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, p.getSession(btd.getKey()), this);
		bte.setAddParam(btpm2);
		
		mScheduler.enqueue(bte);
	}
	
	
	/**
	 * send choked type peer message
	 * @param choked choked or unchoked
	 * @param p destination peer
	 */
	public void sendChoked(boolean choked, BTPeer p){
		
	    	if(p==mAgent) return;
	    	
			BTDocument btd=(BTDocument)getDocument();

			BTPeerMessage btpm2=null;
			if(choked){
			    btpm2=new BTPeerMessage(BTPeerMessage.CHOKE);
					mUnchokedList.remove(p);
			}
			else {
				btpm2=new BTPeerMessage(BTPeerMessage.UNCHOKE);
					mUnchokedList.add(p);
			}
			btpm2.mDocHashKey=btd.getKey();
		
			double t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent,p);
			
			//PEER_MESSAGE events use BTSession as the first param object, which is different from other events
			BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, p.getSession(btd.getKey()), this);
			bte.setAddParam(btpm2);
			
			mScheduler.enqueue(bte);
			
			if(choked)
				mDebugLog.info(""+mScheduler.getCurrent()+": "+mAgent+" sends choked to "+p);
			else
				mDebugLog.info(""+mScheduler.getCurrent()+": "+mAgent+" sends unchoked to "+p);
		}

	/**
	 * update piece information in local database
	 * @param node peer
	 * @param piece piece index
	 * @return success or not
	 */
	public boolean addNodeHasPiece(BTPeer node, int piece){
		if(piece>=mDocument.getPieceNum()){
			mDebugLog.warning(node+" announced a piece out of bound!");
			return false;
		}
		mKnowledge[piece].add(node);
		return true;
	}
	
	
	/**
	 * count peers with a specific piece
	 * @param i piece index
	 * @return count
	 */
	public int countPeersWithPiece(int i){
	    return mKnowledge[i].size();
	}

	/**
	 * gets string description
	 * @return a string describing this session
	 */
	public String toString() {
	    return "S("+mDocument.getKey()+")@"+mAgent;
	}
}


⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -