📄 btsession.java
字号:
case BTPeerMessage.CHOKE:
handlePMChoke(e);
return;
case BTPeerMessage.UNCHOKE:
handlePMUnChoke(e);
return;
case BTPeerMessage.INTERESTED:
handlePMInterested(e);
return;
case BTPeerMessage.NOT_INTERESTED:
handlePMNotInterested(e);
return;
//update knowledge
case BTPeerMessage.HAVE:
handlePMHave(e);
return;
case BTPeerMessage.REQUEST:
//if receive request, send piece
handlePMRequest(e);
return;
case BTPeerMessage.PIECE:
//bandwidth information is included in the piece message
//this is the real download, it should let the other side know the finish of the piece,
//however, in case the bandwidth changed, the scheduled event should be changed
handlePMPiece(e);
return;
case BTPeerMessage.CANCEL:
handlePMCancel(e);
return;
}
}
/**
* handles connection time event, cuts connection
* @param e the event object
*/
public void handleConnectionTimeout(BTEvent e){
BTPeer btp=(BTPeer)e.getParam();
mDebugLog.info(mScheduler.getCurrent()+": "+this+" closes connection to "+btp);
closeConnection(btp);
}
/**
* handles the bitfield type peer message
* In this simulation, bitfiled message works as the reply of handshaking
* this is received by the initiator, and the connection is really setup.
*
* @param e the event object
*/
public void handlePMBitField(BTEvent e){
//handshake initiator
BTSession bts=(BTSession)e.getParam();
BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
//check whether already has a connection to that agent for this document
BTSocket connection=null;
BTPeer p=bts.getAgent();
if(!mConnections.containsKey(p)){
//no current connection for this file, new a connection
connection=new BTSocket(this);
mConnections.put(p, connection);
//set up the timeout event, to be processed by this session, when triggered, it'll close the connection
BTEvent bte=new BTEvent(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT, BTEvent.CONNECTION_TIMEOUT, this, p);
mScheduler.enqueue(bte);
connection.mTimeout=bte;
//mDebugLog.info(mScheduler.getCurrent()+": "+mAgent+" connection timeout set up to " +(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT)+" for connection to "+bts.getAgent());
}
else {
connection=(BTSocket)mConnections.get(p);
mScheduler.cancel(connection.mTimeout);
connection.mTimeout.setTimeStamp(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT);
mScheduler.enqueue(connection.mTimeout);
//mDebugLog.info(mScheduler.getCurrent()+": "+mAgent+" connection timeout updated2 to " +(mScheduler.getCurrent()+BTSocket.CONNECTION_TIMEOUT)+" for connection to "+bts.getAgent());
if(connection.mKeepAlive!=null)
mScheduler.cancel(connection.mKeepAlive);
}
//set up the keep alive event, to be processed by the connection, when triggered, send a keep alive message
BTEvent bte2=new BTEvent(mScheduler.getCurrent()+BTSocket.CONNECTION_KEEP_ALIVE, BTEvent.CONNECTION_KEEP_ALIVE, connection, connection);
mScheduler.enqueue(bte2);
connection.mKeepAlive=bte2;
connection.mBitField=btpm.mBitfield;
//the other side is interested to download from me
if(btpm.mInterested)
connection.mPeerInterested=true;
//set up connection reference to each other
connection.mCounterpart=btpm.mConnection;
btpm.mConnection.mCounterpart=connection;
//the other side is the sender of data, this side is receiver
connection.mConnection=new BTConnection(connection.mCounterpart, connection);
connection.mCounterpart.mConnection=connection.mConnection;
mDebugLog.info(mScheduler.getCurrent()+": connection set up between "+mAgent+" and "+ connection.mCounterpart.mSession.getAgent());
BTDocument btd=getDocument();
//XXX
for(int i=0;i<btpm.mBitfield.length;i++){
if(btpm.mBitfield[i]){
if(!btd.havePiece(i))
connection.mAmInterested=true;
addNodeHasPiece(bts.getAgent(), i);
}
}
if(!connection.mAmInterested)
sendInterested(false, p);
else
sendInterested(true, p);
//the connection is choked at the creation
//request is not sent until receive unchoke from the other side
//XXX
if((mUnchokedList.size()<=mBTARC.getUnchokeNum()))
{
connection.mAmChoking=false;
sendChoked(false, p);
mUnchokedList.add(p);
mPeerCandidateList.remove(p);
}
else{
connection.mAmChoking=true;
sendChoked(true, p);
mUnchokedList.remove(p);
//avoid duplication, since mUnchokedList is set, but mPeerCandidateList is linked list
mPeerCandidateList.remove(p);
mPeerCandidateList.add(p);
}
}
/**
* handles the request type peer message
*
* @param e the event object
*/
public void handlePMRequest(BTEvent e){
//if receive request, send piece
BTSession bts=(BTSession)e.getParam();
BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
if(!getDocument().havePiece(btpm.mIndex)){
mDebugLog.warning(mScheduler.getCurrent()+": "+mAgent+" received a request from "+bts.getAgent()+" for piece "+btpm.mIndex+"@"+getDocument().getKey()+", but I don't have!");
return;
}
//mDebugLog.info(mScheduler.getCurrent()+": "+mAgent+" received a request from "+bts.getAgent()+" for piece "+btpm.mIndex+"@"+getDocument().getKey());
BTPeer p=bts.getAgent();
BTSocket c=(BTSocket)mConnections.get(p);
//if I'm choking that node, ignore the request
if(c.mAmChoking)
return;
//the bandwidth is negotiated when the piece is received
//send that block
BTPeerMessage btpm2=new BTPeerMessage(BTPeerMessage.PIECE);
btpm2.mDocHashKey=btpm.mDocHashKey;
//piece index
btpm2.mIndex=btpm.mIndex;
//offset within that piece
btpm2.mBegin=btpm.mBegin;
//block length
btpm2.mLength=btpm.mLength;
//mDebugLog.info(mScheduler.getCurrent()+": "+mAgent+" send out piece "+btpm.mIndex+"@"+getDocument().getKey()+" to "+bts.getAgent()+", begin from "+btpm.mBegin+", length "+btpm.mLength);
//to simulate block pipeline, if the conection is currently transfering, omit the network delay for seinding request, and piece message
double t=0;
boolean pipe=Boolean.parseBoolean(BT.getInstance().getProtocolProperties().getProperty("Pipeline"));
if(pipe)
{
t=mScheduler.getCurrent()+0.0000000001;
if(!c.mConnection.isActive())
t+=BT.getInstance().getDelayAgent(mAgent,bts.getAgent());
}
else{
t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent,bts.getAgent());
}
//all the PEER_MESSAGE event use BTSession as the first param object
BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, bts, this);
bte.setAddParam(btpm2);
//mDebugLog.info(mScheduler.getCurrent()+": "+mAgent+" received a request for piece "+btpm.mIndex+" from "+bts.getAgent()+", block sent");
mScheduler.enqueue(bte);
}
/**
* handles the piece type peer message
*
* @param e the event object
*/
public void handlePMPiece(BTEvent e){
//bandwidth information is included in the piece message
//this is the real download, it should let the other side know the finish of the piece,
//however, in case the bandwidth changed, the scheduled event should be changed
BTSession bts=(BTSession)e.getParam();
BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
BTSocket btc=(BTSocket)mConnections.get(bts.getAgent());
if(btc.mRequestingPieceIndex!=btpm.mIndex){
mDebugLog.warning(mScheduler.getCurrent()+": "+this+" received piece "+ btpm.mIndex+" from "+bts.getAgent()+" but this peer requested"+ btc.mRequestingPieceIndex);
return;
}
btc.mBegin=btpm.mBegin;
btc.mLength=btpm.mLength;
btc.mStartTime=mScheduler.getCurrent();
double dRate=0;
if(!btc.mConnection.isActive()){
btc.mConnection.setActive();
}
//the downloading flag is set only after downloading begings
btc.mDownloading=true;
btc.mCounterpart.mUploading=true;
dRate=btc.mConnection.getBandwidth();
if(btc.mCounterpart.mDownloading) {
dRate/=2;
btc.mConnection.setBandwidthFrom(btc, dRate);
btc.mConnection.setBandwidthFrom(btc.mCounterpart, dRate);
btc.mCounterpart.adjustInBandwidth(-dRate);
btc.adjustOutBandwidth(-dRate);
}
else{
btc.mConnection.setBandwidthFrom(btc, 0);
btc.mConnection.setBandwidthFrom(btc.mCounterpart, dRate);
}
//schedule a finish block event
double t=mScheduler.getCurrent()+btc.mLength/dRate;
if(t<mScheduler.getCurrent()){
mDebugLog.warning(mScheduler.getCurrent()+" handle piece at "+this+" mLength is"+btc.mLength);
}
BTEvent bte=new BTEvent(t, BTEvent.CONNECTION_DOWNLOAD_BLOCK_FINISH, btc, null);
btc.mEstimateFinish=bte;
bte.setAddParam(new Double(dRate));
btc.mDownloadLeft=btc.mLength;
//mDebugLog.info(mScheduler.getCurrent()+": "+this+" received a block in piece "+ btc.mRequestingPieceIndex+ " of "+getDocument()+" from "+btc.mCounterpart+", bandwidth is "+btc.mConnection.mBandwidth+", estimated finish time "+t);
mScheduler.enqueue(bte);
}
/**
* handles the have type peer message
*
* @param e the event object
*/
public void handlePMHave(BTEvent e){
BTSession bts=(BTSession)e.getParam();
BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
BTPeer p=(BTPeer)bts.getAgent();
BTSocket btc=(BTSocket)mConnections.get(p);
btc.mBitField[btpm.mIndex]=true;
addNodeHasPiece(p, btpm.mIndex);
//if that node has more pieces than me, cancel connection close event.
ArrayList ar=btc.comparePieces();
if(!ar.isEmpty()){
mScheduler.cancel(btc.mTimeout);
}
//if I already have that piece, simply return
if(mDocument.havePiece(btpm.mIndex))
return;
//if I'm unchoked by peer and currently downloading, don't do anything
if(!btc.mPeerChoking){
if(getDocument().isWhole()){
//send uninterested message
btc.mAmInterested=false;
sendInterested(false, p);
}
else sendRequestTo(p);
}
//choked state
else {
//mDebugLog.fine(mScheduler.getCurrent()+": "+mAgent+" not sending request to "+p+", because being choked");
if(!btc.mAmInterested){
//if I doesn't express interest, I won't be unchoked
//send an interested message
btc.mAmInterested=true;
sendInterested(true, p);
}
}
}
/**
* handles the choke type peer message
*
* @param e the event object
*/
public void handlePMChoke(BTEvent e){
BTSession bts=(BTSession)e.getParam();
BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
BTPeer p=(BTPeer)bts.getAgent();
BTSocket connection=(BTSocket)mConnections.get(p);
mDebugLog.info(mScheduler.getCurrent()+": "+mAgent+" received choke from "+p);
connection.mPeerChoking=true;
/*
//if not currently downloading data, make sure to set downloading false
BTBandwidthManager btbm=(BTBandwidthManager)mAgent.getNode().getBandwidthManager();
if(!btbm.isDownloading(connection)){
//reset the connection value
connection.mDownloading=false;
connection.mBegin=0;
}
*/
//the other may already set it to inactive.
connection.mConnection.setInActive();
connection.mDownloading=false;
connection.mBegin=0;
}
/**
* handles the unchoke type peer message
*
* @param e the event object
*/
public void handlePMUnChoke(BTEvent e){
BTSession bts=(BTSession)e.getParam();
BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
BTPeer p=(BTPeer)bts.getAgent();
BTSocket connection=(BTSocket)mConnections.get(p);
//whenever a peer is unchoked and I'm interested, request a piece
connection.mPeerChoking=false;
//since I received an unchoke, I must be already interested in peers' file
if(getDocument().isWhole()){
//send uninterested
connection.mAmInterested=false;
sendInterested(false, p);
return;
}
if(connection.mRequestTimeout!=null)
return;
int index=mPieceSelection.selectPiece(this, p);
//send a request
if(index>=0){
//mDebugLog.fine(mScheduler.getCurrent()+": "+mAgent+" send request for piece "+index+" to "+p+" after receive unchoke");
sendRequest(index, p);
}
else{
//mDebugLog.fine(mScheduler.getCurrent()+": "+mAgent+" not sending request to "+p+" after receive unchoke, since no piece is suitable to choose");
connection.mConnection.setInActive();
}
}
/**
* handles the interested type peer message
*
* @param e the event object
*/
public void handlePMInterested(BTEvent e){
BTSession bts=(BTSession)e.getParam();
BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
BTSocket connection=(BTSocket)mConnections.get(bts.getAgent());
connection.mPeerInterested=true;
if(mUnchokedList.size()<=mBTARC.getUnchokeNum())
{
connection.mAmChoking=false;
sendChoked(false, bts.getAgent());
mUnchokedList.add(bts.getAgent());
mPeerCandidateList.remove(bts.getAgent());
}
else{
connection.mAmChoking=true;
sendChoked(true, bts.getAgent());
mUnchokedList.remove(bts.getAgent());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -