📄 btsession.java
字号:
//make sure no duplication
mPeerCandidateList.remove(bts.getAgent());
mPeerCandidateList.add(bts.getAgent());
}
//only interested peers will be taken into consideration for unchoking
}
/**
* handles the notInterested type peer message
*
* @param e the event object
*/
public void handlePMNotInterested(BTEvent e){
BTSession bts=(BTSession)e.getParam();
BTPeerMessage btpm=(BTPeerMessage)e.getAddParam();
BTSocket connection=(BTSocket)mConnections.get(bts.getAgent());
connection.mPeerInterested=false;
mPeerCandidateList.remove(bts.getAgent());
}
/**
* handles the cancel type peer message
*
* @param e the event object
*/
public void handlePMCancel(BTEvent e){
//not implemented yet
}
/**
* the entry point of this session,
* after the session is initionalized, this function is called
* to start the communication with tracker and peers
*/
public void queryTracker(){
//format of dochashkey is key_xxx
BTTracker tracker=mTorrent.mTracker;
double t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent, tracker);
BTEvent bte=new BTEvent(t, BTEvent.TRACKER_GET_REQUEST, tracker, mAgent);
//request is based on the document hash key rather than the piece hash key
BTGetRequest btgr=new BTGetRequest(mDocument.getKey(), BTGetRequest.STARTED, mDocument.getLeftSize(), this);
//BTGetRequest as the additional parameter
bte.setAddParam(btgr);
mScheduler.enqueue(bte);
}
/**
* sends announcement to tracker
*/
public void announceTracker(){
BTTracker tracker=mTorrent.mTracker;
BTGetRequest btgr;
double t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent, tracker);
BTEvent bte=new BTEvent(t, BTEvent.TRACKER_GET_REQUEST, tracker, mAgent);
double leftSize=mDocument.getLeftSize();
//request is based on the document hash key rather than the piece hash key
switch(mStatus){
case WORKING:
//0 in event field means the event param is not present, i.e. regular announcement
btgr=new BTGetRequest(mDocument.getKey(), 0, leftSize, this);
mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+mAgent+" -- Announce -- "+mDocument.getKey()+" -- "+tracker+" -- Left "+leftSize);
break;
case COMPLETED:
btgr=new BTGetRequest(mDocument.getKey(), BTGetRequest.COMPLETED, 0, this);
mScheduler.cancel(mNextTrackerAnnouncement);
mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+mAgent+" -- Announce -- "+mDocument.getKey()+" -- "+tracker+" -- Completed");
break;
case STOPPED:
btgr=new BTGetRequest(mDocument.getKey(), BTGetRequest.STOPPED, leftSize, this);
mScheduler.cancel(mNextTrackerAnnouncement);
mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+mAgent+" -- Announce -- "+mDocument.getKey()+" -- "+tracker+" -- Stopped");
break;
case FAILED:
mScheduler.cancel(mNextTrackerAnnouncement);
mTraceLog.info(LogFormatter.sprintf("%.9f", mScheduler.getCurrent())+": "+mAgent+" -- Announce -- "+mDocument.getKey()+" -- "+tracker+" -- Failed");
return;
default:
mDebugLog.warning("an announcement to tracker is not supposed to be scheduled.");
return;
}
//BTGetRequest as the additional parameter
bte.setAddParam(btgr);
mScheduler.enqueue(bte);
}
/**
* let all the connected peers know that
* this node finishes downloading a piece.
* @param index piece index
*/
public void announcePeer(int index){
//send have message to all the connected peers in this session
String key=mDocument.getKey();
BTPeerMessage btpm=new BTPeerMessage(BTPeerMessage.HAVE);
btpm.mDocHashKey=key;
btpm.mIndex=index;
double t=0;
//for all the nodes in the connection list, add this node to there knowledge[piece]
ArrayList nodeList=new ArrayList(mConnections.keySet());
//the announcement is to be process by BTSession object rather than BTPeer object
for(int i=0;i<nodeList.size();i++){
BTPeer p=(BTPeer)(nodeList.get(i));
BTSession bts=p.getSession(key);
t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent,p);
BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, (SimEventHandler)bts, this);
bte.setAddParam(btpm);
mScheduler.enqueue(bte);
if(mDocument.isWhole()){
sendInterested(false, p);
}
}
}
/**
* set up a connection to a peer
* @param p destination peer
* @param chokedornot initially choked or not
*/
public void connectTo(BTPeer p, boolean chokedornot){
//not supposed to connect itself
if(p==mAgent) return;
//if it's already connected, just skip
if(mConnections.containsKey(p)){
BTSocket conn=getConnection(p);
if(conn.mAmChoking!=chokedornot){
conn.mAmChoking=chokedornot;
sendChoked(chokedornot, p);
}
return;
}
//if not choked, add it to unchoked list, so that when the connection
//is really created, it can send unchoked
else if(!chokedornot){
mUnchokedList.add(p);
}
BTPeerMessage btpm=new BTPeerMessage(BTPeerMessage.HAND_SHAKING);
btpm.mDocHashKey=getDocument().getKey();
//for simplicity, also includes bitfiled informaition
btpm.mBitfield=(boolean[])getDocument().getPieces().clone();
//by default, set it as interested
btpm.mInterested=true;
double t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent,p);
//PEER_MESSAGE events use BTSession as the first param object, which is different from other events
BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, p, this);
bte.setAddParam(btpm);
mScheduler.enqueue(bte);
}
/**
* close connection to peer
* @param p destination peer
*/
public void closeConnection(BTPeer p){
if(mConnections.containsKey(p)){
BTSocket conn=getConnection(p);
BTSocket counterpart=conn.mCounterpart;
//make sure all the arranged events are cancelled
mScheduler.cancel(conn.mTimeout);
mScheduler.cancel(conn.mRequestTimeout);
mScheduler.cancel(conn.mEstimateFinish);
mScheduler.cancel(conn.mKeepAlive);
mScheduler.cancel(conn.mKeepAliveMsg);
conn.mTimeout=null;
conn.mRequestTimeout=null;
conn.mEstimateFinish=null;
conn.mKeepAlive=null;
conn.mKeepAliveMsg=null;
conn.mBitField=null;
if(conn.mConnection!=null){
conn.mConnection.tearDown();
//update link information on screen
ProtocolPanel pp=Simulator.getInstance().getPP();
if(pp!=null){
pp.getSimGuiControl().getGraphPanel().repaint();
}
}
conn.mConnection=null;
conn.mCounterpart=null;
mConnections.remove(p);
if(counterpart!=null){
counterpart.mConnection=null;
counterpart.mSession.closeConnection(getAgent());
}
}
mUnchokedList.remove(p);
if(mConnections.size()==0){
mRemoved=true;
mAgent.removeSession(mDocument.getKey());
}
}
/**
* send a piece request message to a peer
* @param p destination peer
*/
public void sendRequestTo(BTPeer p){
BTSocket btc=(BTSocket)mConnections.get(p);
//if I'm unchoked by peer and currently downloading, don't do anything
if(!btc.mPeerChoking){
//since I'm unchoked, interested should already been set.
//if not whole and not requested yet send a request
if(!getDocument().isWhole()&&(btc.mRequestTimeout==null)){
if(btc.mRequestingPieceIndex==-1){
int index=mPieceSelection.selectPiece(this, p);
//send a request
if(index>=0){
//mDebugLog.fine(mScheduler.getCurrent()+": "+mAgent+" sends request for piece "+index+" to "+p+" after receive ");
sendRequest(index, p);
return;
}
else {
//if index less than 0, means selection failed
//mDebugLog.fine(mScheduler.getCurrent()+": "+mAgent+" not sending request to "+p+", since index= "+index+",no piece is suitable to choose");
btc.mConnection.setInActive();
return;
}
}
else sendRequest(btc.mRequestingPieceIndex, p);
}
}
}
/**
* send request for a specific piece to a peer
* @param index piece index
* @param p destination peer
*/
public void sendRequest(int index, BTPeer p){
BTDocument btd=(BTDocument)getDocument();
BTPeerMessage btpm2=new BTPeerMessage(BTPeerMessage.REQUEST);
BTSocket btc=(BTSocket)getConnection(p);
if(btc.mRequestTimeout!=null){
mDebugLog.warning(""+mScheduler.getCurrent()+" "+mAgent+" not supposed to send request to "+p+" before previous request timeout");
return;
}
btc.mRequestingPieceIndex=index;
btpm2.mDocHashKey=btd.getKey();
btpm2.mIndex=index;
btd.addPieceDownloading(index);
if(btd.inPartialList(index)){
btpm2.mBegin=btd.getPieceProgress(index);
}
else{
btpm2.mBegin=0;
btd.pieceProgress(index, 0);
}
if(btd.getPieceLength(index)-btpm2.mBegin<btd.getBlockLength()){
btpm2.mLength=(int)(btd.getPieceLength(index)-btpm2.mBegin);
}
else
btpm2.mLength=(int)btd.getBlockLength();
//to simulate block pipeline, if the conection is currently transfering, omit the network delay for seinding request, and piece message
double t=0;
boolean pipe=Boolean.parseBoolean(BT.getInstance().getProtocolProperties().getProperty("Pipeline"));
if(pipe)
{
t=mScheduler.getCurrent()+0.0000000001;
if(!btc.mConnection.isActive())
t+=BT.getInstance().getDelayAgent(mAgent,p);
}
else{
t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent,p);
}
//PEER_MESSAGE events use BTSession as the first param object, which is different from other events
BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, p.getSession(btd.getKey()), this);
bte.setAddParam(btpm2);
mScheduler.enqueue(bte);
t+=BTSocket.REQUEST_TIMEOUT;
BTEvent bte2=new BTEvent(t, BTEvent.REQUEST_TIMEOUT, btc, this);
bte2.setAddParam(p);
btc.mRequestTimeout=bte2;
mScheduler.enqueue(bte2);
}
/**
* send interested type peer message
* @param interested interested or uninterested
* @param p destination peer
*/
public void sendInterested(boolean interested, BTPeer p){
BTDocument btd=(BTDocument)getDocument();
BTPeerMessage btpm2=null;
if(interested){
btpm2=new BTPeerMessage(BTPeerMessage.INTERESTED);
}
else btpm2=new BTPeerMessage(BTPeerMessage.NOT_INTERESTED);
btpm2.mDocHashKey=btd.getKey();
double t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent,p);
//PEER_MESSAGE events use BTSession as the first param object, which is different from other events
BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, p.getSession(btd.getKey()), this);
bte.setAddParam(btpm2);
mScheduler.enqueue(bte);
}
/**
* send choked type peer message
* @param choked choked or unchoked
* @param p destination peer
*/
public void sendChoked(boolean choked, BTPeer p){
if(p==mAgent) return;
BTDocument btd=(BTDocument)getDocument();
BTPeerMessage btpm2=null;
if(choked){
btpm2=new BTPeerMessage(BTPeerMessage.CHOKE);
mUnchokedList.remove(p);
}
else {
btpm2=new BTPeerMessage(BTPeerMessage.UNCHOKE);
mUnchokedList.add(p);
}
btpm2.mDocHashKey=btd.getKey();
double t=mScheduler.getCurrent()+BT.getInstance().getDelayAgent(mAgent,p);
//PEER_MESSAGE events use BTSession as the first param object, which is different from other events
BTEvent bte=new BTEvent(t, BTEvent.PEER_MESSAGE, p.getSession(btd.getKey()), this);
bte.setAddParam(btpm2);
mScheduler.enqueue(bte);
if(choked)
mDebugLog.info(""+mScheduler.getCurrent()+": "+mAgent+" sends choked to "+p);
else
mDebugLog.info(""+mScheduler.getCurrent()+": "+mAgent+" sends unchoked to "+p);
}
/**
* update piece information in local database
* @param node peer
* @param piece piece index
* @return success or not
*/
public boolean addNodeHasPiece(BTPeer node, int piece){
if(piece>=mDocument.getPieceNum()){
mDebugLog.warning(node+" announced a piece out of bound!");
return false;
}
mKnowledge[piece].add(node);
return true;
}
/**
* count peers with a specific piece
* @param i piece index
* @return count
*/
public int countPeersWithPiece(int i){
return mKnowledge[i].size();
}
/**
* gets string description
* @return a string describing this session
*/
public String toString() {
return "S("+mDocument.getKey()+")@"+mAgent;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -