⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 btsession.java

📁 p2p仿真
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
			case BTPeerMessage.CHOKE:
				handlePMChoke(e);
				return;
			
			case BTPeerMessage.UNCHOKE:
				handlePMUnChoke(e);
				return;
			
			case BTPeerMessage.INTERESTED:
				handlePMInterested(e);
				return;
				
			case BTPeerMessage.NOT_INTERESTED:
				handlePMNotInterested(e);
				return;
				
			//update knowledge
			case BTPeerMessage.HAVE:
				handlePMHave(e);
				return;

			case BTPeerMessage.REQUEST:
			//if receive request, send piece
				handlePMRequest(e);
				return;
				
			case BTPeerMessage.PIECE:
				//bandwidth information is included in the piece message
				//this is the real download, it should let the other side know the finish of the piece, 
				//however, in case the bandwidth changed, the scheduled event should be changed
				handlePMPiece(e);
				return;
			
			case BTPeerMessage.CANCEL:	
				handlePMCancel(e);
				return;
		}
	
	}
	
	
	/**
	 * handles connection time event, cuts connection
	 * @param e the event object
	 */
	public void handleConnectionTimeout(BTEvent e){
	    BTPeer btp=(BTPeer)e.getParam();
		mDebugLog.info(mScheduler.getCurrent()+": "+this+" closes connection to "+btp);
	    closeConnection(btp);
	}
	
	
	/**
	 * handles the bitfield type peer message
	 * In this simulation, bitfiled message works as the reply of handshaking
	 * this is received by the initiator, and the connection is really setup.
	 *
	 * @param e the event object
	 */
	public void handlePMBitField(BTEvent e){
		
		//handshake initiator
		
		BTSession bts=(BTSession)e.getParam();
		
		BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();

		//check whether already has a connection to that agent for this document
		BTSocket connection=null;
		BTPeer p=bts.getAgent();
		if(!mConnections.containsKey(p)){
		
			//no current connection for this file, new a connection
			connection=new BTSocket(this);
			mConnections.put(p, connection);
			
			//set up the timeout event, to be processed by this session, when triggered, it'll close the connection
			BTEvent bte=new BTEvent(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT, BTEvent.CONNECTION_TIMEOUT, this, p);
			mScheduler.enqueue(bte);
			connection.mTimeout=bte;
			//mDebugLog.info(mScheduler.getCurrent()+": "+mAgent+" connection timeout set up to " +(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT)+" for connection to "+bts.getAgent());

		}
		else {
			connection=(BTSocket)mConnections.get(p);
		    mScheduler.cancel(connection.mTimeout);
		    connection.mTimeout.setTimeStamp(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT);
		    mScheduler.enqueue(connection.mTimeout);
			//mDebugLog.info(mScheduler.getCurrent()+": "+mAgent+" connection timeout updated2 to " +(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT)+" for connection to "+bts.getAgent());

			if(connection.mKeepAlive!=null)
				mScheduler.cancel(connection.mKeepAlive);
		}
		
		
		//set up the keep alive event, to be processed by the connection, when triggered, send a keep alive message
		BTEvent bte2=new BTEvent(mScheduler.getCurrent()+BTSocket.CONNECTION_KEEP_ALIVE, BTEvent.CONNECTION_KEEP_ALIVE, connection, connection);
		mScheduler.enqueue(bte2);
		connection.mKeepAlive=bte2;
		
		connection.mBitField=btpm.mBitfield;
		
		//the other side is interested to download from me
		if(btpm.mInterested)
			connection.mPeerInterested=true;
		
		//set up connection reference to each other
		connection.mCounterpart=btpm.mConnection;
		btpm.mConnection.mCounterpart=connection;
		
		//the other side is the sender of data, this side is receiver
		connection.mConnection=new BTConnection(connection.mCounterpart, connection);
		connection.mCounterpart.mConnection=connection.mConnection;
		
		mDebugLog.info(mScheduler.getCurrent()+": connection set up between "+mAgent+" and "+ connection.mCounterpart.mSession.getAgent());
		
		BTDocument btd=getDocument();
		//XXX
		for(int i=0;i<btpm.mBitfield.length;i++){
			if(btpm.mBitfield[i]){
				if(!btd.havePiece(i))
					connection.mAmInterested=true;
				addNodeHasPiece(bts.getAgent(), i);
			}
		}
		
		if(!connection.mAmInterested)
			sendInterested(false, p);
		else
			sendInterested(true, p);

		//the connection is choked at the creation
		//request is not sent until receive unchoke from the other side
			//XXX

			if((mUnchokedList.size()<=mBTARC.getUnchokeNum()))
			{
				connection.mAmChoking=false;
				sendChoked(false, p);
				mUnchokedList.add(p);
				mPeerCandidateList.remove(p);
			}
			else{
				connection.mAmChoking=true;
				sendChoked(true, p);
				mUnchokedList.remove(p);
				//avoid duplication, since mUnchokedList is set, but mPeerCandidateList is linked list
				mPeerCandidateList.remove(p);
				mPeerCandidateList.add(p);
			}

	}


	/**
	 * handles the request type peer message
	 *
	 * @param e the event object
	 */
	public void handlePMRequest(BTEvent e){
		
		//if receive request, send piece
		BTSession bts=(BTSession)e.getParam();
		BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();

		if(!getDocument().havePiece(btpm.mIndex)){
			mDebugLog.warning(mScheduler.getCurrent()+": "+mAgent+" received a request from "+bts.getAgent()+" for piece "+btpm.mIndex+"@"+getDocument().getKey()+", but I don't have!");
			return;
		}	

		//mDebugLog.info(mScheduler.getCurrent()+": "+mAgent+" received a request from "+bts.getAgent()+" for piece "+btpm.mIndex+"@"+getDocument().getKey());
	
		BTPeer p=bts.getAgent();
		
		BTSocket c=(BTSocket)mConnections.get(p);
		
		//if I'm choking that node, ignore the request
		if(c.mAmChoking)
			return;
		
		//the bandwidth is negotiated when the piece is received
		
		//send that block
		BTPeerMessage btpm2=new BTPeerMessage(BTPeerMessage.PIECE);
		btpm2.mDocHashKey=btpm.mDocHashKey;

		//piece index
		btpm2.mIndex=btpm.mIndex;
		//offset within that piece
		btpm2.mBegin=btpm.mBegin;
		//block length
		btpm2.mLength=btpm.mLength;
		
		//mDebugLog.info(mScheduler.getCurrent()+": "+mAgent+" send out piece "+btpm.mIndex+"@"+getDocument().getKey()+" to "+bts.getAgent()+", begin from "+btpm.mBegin+", length "+btpm.mLength);

		//to simulate block pipeline, if the conection is currently transfering, omit the network delay for seinding request, and piece message
		double t=0;
		boolean pipe=Boolean.parseBoolean(BT.getInstance().getProtocolProperties().getProperty("Pipeline"));
		if(pipe)
		{

			t=mScheduler.getCurrent()+0.0000000001;
			if(!c.mConnection.isActive())
				t+=BT.getInstance().getDelayAgent(mAgent,bts.getAgent());
		}
		else{
			t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent,bts.getAgent());
		}
		//all the PEER_MESSAGE event use BTSession as the first param object
		BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, bts, this);
		bte.setAddParam(btpm2);
		//mDebugLog.info(mScheduler.getCurrent()+": "+mAgent+" received a request for piece "+btpm.mIndex+" from "+bts.getAgent()+", block sent");
		mScheduler.enqueue(bte);
	}


	/**
	 * handles the piece type peer message
	 *
	 * @param e the event object
	 */
	public void handlePMPiece(BTEvent e){
		//bandwidth information is included in the piece message
		//this is the real download, it should let the other side know the finish of the piece, 
		//however, in case the bandwidth changed, the scheduled event should be changed

		BTSession bts=(BTSession)e.getParam();
		BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
		BTSocket btc=(BTSocket)mConnections.get(bts.getAgent());
				

		if(btc.mRequestingPieceIndex!=btpm.mIndex){
		    mDebugLog.warning(mScheduler.getCurrent()+": "+this+" received piece "+ btpm.mIndex+" from "+bts.getAgent()+" but this peer requested"+ btc.mRequestingPieceIndex);
		    return;
		}
		
		btc.mBegin=btpm.mBegin;
		btc.mLength=btpm.mLength;
		btc.mStartTime=mScheduler.getCurrent();
		   
		double dRate=0;
		
		if(!btc.mConnection.isActive()){
			btc.mConnection.setActive();
		}

		//the downloading flag is set only after downloading begings
		btc.mDownloading=true;

		btc.mCounterpart.mUploading=true;
		
		dRate=btc.mConnection.getBandwidth();
		
		if(btc.mCounterpart.mDownloading) {		
			dRate/=2;
			btc.mConnection.setBandwidthFrom(btc, dRate);
			btc.mConnection.setBandwidthFrom(btc.mCounterpart, dRate);
			btc.mCounterpart.adjustInBandwidth(-dRate);
			btc.adjustOutBandwidth(-dRate);
		}
		else{
			btc.mConnection.setBandwidthFrom(btc, 0);
			btc.mConnection.setBandwidthFrom(btc.mCounterpart, dRate);
		}
		//schedule a finish block event
		double t=mScheduler.getCurrent()+btc.mLength/dRate;
		if(t<mScheduler.getCurrent()){
			mDebugLog.warning(mScheduler.getCurrent()+" handle piece at "+this+" mLength is"+btc.mLength);
		}
		
		BTEvent bte=new BTEvent(t, BTEvent.CONNECTION_DOWNLOAD_BLOCK_FINISH, btc, null);
		btc.mEstimateFinish=bte;
		bte.setAddParam(new Double(dRate));
		btc.mDownloadLeft=btc.mLength;
		//mDebugLog.info(mScheduler.getCurrent()+": "+this+" received a block in piece "+ btc.mRequestingPieceIndex+ " of "+getDocument()+" from "+btc.mCounterpart+", bandwidth is "+btc.mConnection.mBandwidth+", estimated finish time "+t);
		mScheduler.enqueue(bte);
	}


	/**
	 * handles the have type peer message
	 *
	 * @param e the event object
	 */
	public void handlePMHave(BTEvent e){
		BTSession bts=(BTSession)e.getParam();
		BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
		BTPeer p=(BTPeer)bts.getAgent();
		
		BTSocket btc=(BTSocket)mConnections.get(p);
		
		btc.mBitField[btpm.mIndex]=true;
		addNodeHasPiece(p, btpm.mIndex);
		
		//if that node has more pieces than me, cancel connection close event.
		ArrayList ar=btc.comparePieces();
		if(!ar.isEmpty()){
			mScheduler.cancel(btc.mTimeout);
		}
		
		//if I already have that piece, simply return
		if(mDocument.havePiece(btpm.mIndex))
			return;
		
		//if I'm unchoked by peer and currently downloading, don't do anything
		if(!btc.mPeerChoking){
			
			if(getDocument().isWhole()){
			    //send uninterested message
			    btc.mAmInterested=false;
			    sendInterested(false, p);
			}
			else sendRequestTo(p);
		}
		//choked state
		else {
			//mDebugLog.fine(mScheduler.getCurrent()+": "+mAgent+" not sending request to "+p+", because being choked");
			
			if(!btc.mAmInterested){
			//if I doesn't express interest, I won't be unchoked
			//send an interested message
			    btc.mAmInterested=true;
			    sendInterested(true, p);
			}
		}
	}

	/**
	 * handles the choke type peer message
	 *
	 * @param e the event object
	 */
	public void handlePMChoke(BTEvent e){
		BTSession bts=(BTSession)e.getParam();
		BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
		BTPeer p=(BTPeer)bts.getAgent();
		BTSocket connection=(BTSocket)mConnections.get(p);
		mDebugLog.info(mScheduler.getCurrent()+": "+mAgent+" received choke from "+p);
		
		connection.mPeerChoking=true;
		
		/*
		//if not currently downloading data, make sure to set downloading false
		BTBandwidthManager btbm=(BTBandwidthManager)mAgent.getNode().getBandwidthManager();
		if(!btbm.isDownloading(connection)){
		    //reset the connection value
		    connection.mDownloading=false;
		    connection.mBegin=0;
		}
		*/
		
		//the other may already set it to inactive.
		connection.mConnection.setInActive();
		connection.mDownloading=false;
		connection.mBegin=0;
	}

	/**
	 * handles the unchoke type peer message
	 *
	 * @param e the event object
	 */
	public void handlePMUnChoke(BTEvent e){
		
		BTSession bts=(BTSession)e.getParam();
		BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
		BTPeer p=(BTPeer)bts.getAgent();
		BTSocket connection=(BTSocket)mConnections.get(p);
		//whenever a peer is unchoked and I'm interested, request a piece
		connection.mPeerChoking=false;
		//since I received an unchoke, I must be already interested in peers' file
		
		if(getDocument().isWhole()){
			//send uninterested
			connection.mAmInterested=false;
			sendInterested(false, p);
			return;
		}
		if(connection.mRequestTimeout!=null)
		    return;
	    int index=mPieceSelection.selectPiece(this, p);
		//send a request
	    if(index>=0){
	        //mDebugLog.fine(mScheduler.getCurrent()+": "+mAgent+" send request for piece "+index+" to "+p+" after receive unchoke");
	        sendRequest(index, p);
	    }
	    else{
	    	//mDebugLog.fine(mScheduler.getCurrent()+": "+mAgent+" not sending request to "+p+" after receive unchoke, since no piece is suitable to choose");
			connection.mConnection.setInActive();
	    }
	}

	/**
	 * handles the interested type peer message
	 *
	 * @param e the event object
	 */
	public void handlePMInterested(BTEvent e){
		BTSession bts=(BTSession)e.getParam();
		BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
		BTSocket connection=(BTSocket)mConnections.get(bts.getAgent());
		connection.mPeerInterested=true;
		
		if(mUnchokedList.size()<=mBTARC.getUnchokeNum())
		{
			connection.mAmChoking=false;
			sendChoked(false, bts.getAgent());
			mUnchokedList.add(bts.getAgent());
			mPeerCandidateList.remove(bts.getAgent());
		}
		else{
			connection.mAmChoking=true;
			sendChoked(true, bts.getAgent());
			mUnchokedList.remove(bts.getAgent());

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -