📄 btsession.cpp
字号:
#include <algorithm>
#include <stdio.h>
#include "BTSession.h"
#include "BTEvent.h"
#include "BTTracker.h"
#include "BTTorrent.h"
#include "BTDocument.h"
#include "SimEventScheduler.h"
#include "BTTrackerResponse.h"
#include "BTSocket.h"
#include "BTPeerMessage.h"
#include "BT.h"
#include "BTPeer.h"
#include "BTGetRequest.h"
#include "BTAlgorithmRechoke.h"
#include "BTConnection.h"
#include "Sysparameter.h"
#include "HandleError.h"
#include "Cache.h"
#include "Debug.h"
#include "Statistic.h"
#include "Topology.h"
BTSession::BTSession(string key, BTPeer* agent):
mState(working),
mAgent(agent)
{
mDocument = agent->getDocument(key);
mDocument->setSession(this);
mConnections.clear();
mPeerCandidates.clear();
mUnchokeLists.clear();
mAmountBetweenSnubDetect = 0;
mRechoke = new BTAlgorithmRechoke(this);
mKnowledges.resize( mDocument->mPieceNum);
mDownloadInfo.resize( mDocument->mPieceNum);
for( unsigned int i = 0 ; i< mDocument->mPieceNum; i++){
mKnowledges[i] = new list<BTPeer*>;
mKnowledges[i]->clear();
mDownloadInfo[i] = 0;
}
mOutPutNum = 0;
mActive = false;
mStartTime = Scheduler.getCurrent();
mLastFinishedNum = 0;
mCouldRetire = false;
mTrackerRequest = NULL;
};
BTSession::BTSession(BTTorrent* tt, BTPeer* agent):
mState(working),
mAgent(agent){
mTorrent = tt;
mDocument = agent->getDocument(tt->mKey);
if( mDocument == NULL)
mDocument = agent->addDocument(tt->mKey, tt->mSize, tt->mPopularity);
mDocument->setSession( this);
mConnections.clear();
mPeerCandidates.clear();
mUnchokeLists.clear();
mAmountBetweenSnubDetect = 0;
mRechoke = new BTAlgorithmRechoke(this);
mKnowledges.resize( mDocument->mPieceNum);
mDownloadInfo.resize( mDocument->mPieceNum);
for( unsigned int i = 0 ; i< mDocument->mPieceNum; i++){
mKnowledges[i] = new list<BTPeer*>;
mKnowledges[i]->clear();
mDownloadInfo[i] = 0;
}
mOutPutNum = 0;
mActive = false;
mStartTime = Scheduler.getCurrent();
mLastFinishedNum = 0;
mCouldRetire = false;
mTrackerRequest = NULL;
}
BTSession::~BTSession(void){
delete mRechoke;
};
void BTSession::addKnowledgePiece( unsigned int index, BTPeer* ptr){
if(find( mKnowledges[index]->begin(), mKnowledges[index]->end(), ptr) != mKnowledges[index]->end())
return;
else
mKnowledges[index]->push_back( ptr);
};
vector< BTSocket*>* BTSession::getConnections(void){
vector<BTSocket*>* conlist = new vector<BTSocket*>;
map<BTPeer*, BTSocket*>::iterator aIt = mConnections.begin();
BTSocket* con = NULL;
while( aIt != mConnections.end()){
con = (aIt++)->second;
if( !con->mConnected)
continue;
conlist->push_back(con);
}
return conlist;
}
BTSocket* BTSession::getConnection(BTPeer* peer){
map<BTPeer*, BTSocket*>::iterator aIt = mConnections.find(peer);
if( aIt != mConnections.end())
return aIt->second;
else
return NULL;
}
void BTSession::removeConnection( BTPeer* peer){
map<BTPeer*, BTSocket*>::iterator aIt = mConnections.find( peer);
if( aIt != mConnections.end()){
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d remove connection to peer %d left connection num %d\n", Scheduler.getCurrent(), mAgent->getID(), aIt->first->getID(), mConnections.size());
mConnections.erase(aIt);
}
return;
}
void BTSession::removeConnection( BTSocket* con){
map<BTPeer*, BTSocket*>::iterator aIt = mConnections.begin();
while( aIt != mConnections.end()) {
if( aIt->second == con){
mConnections.erase(aIt);
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d remove connection to peer %d left connection num %d\n", Scheduler.getCurrent(), mAgent->getID(), aIt->first->getID(), mConnections.size());
return;
}
++aIt;
}
}
void BTSession::closeConnection(BTSocket* con){
mAgent->mConnectionNum --;
BTPeer* peer = con->mCounterPart->mSession->getAgent();
removePeerFromCandidates( peer );
removePeerFromUnchokeList( peer);
removeConnection( peer);
removePeerFromKnowledge( peer);
if( con->mInConnection != NULL){
delete con->mInConnection;
con->mInConnection= NULL;
con->mCounterPart->mOutConnection = NULL;
}
if( con->mOutConnection != NULL){
delete con->mOutConnection;
con->mOutConnection= NULL;
con->mCounterPart->mInConnection = NULL;
}
con->mClosed = true;
if( !con->mCounterPart->mClosed )
if( con->mCounterPart->mConnected)
con->mCounterPart->mSession->closeConnection( con->mCounterPart);
else{
con->mCounterPart->mSession->removeConnection( con->mCounterPart);
delete con->mCounterPart;
}
delete con;
if( mAgent->mType == superseed && mConnections.size() == 0){
mActive = false;
mRechoke->cancelRechoke();
}
if( mAgent->mType == normal && !mDocument->isWhole() && mConnections.size() < MIN_SESSION_CON_NUM)
announceTracker();
}
void BTSession::queryTracker(void){
BTTracker* tracker = mTorrent->mTracker;
BTGetRequest* btg = new BTGetRequest( mTorrent->mKey, start, mDocument->getLeftSize(), this );
double t = Scheduler.getCurrent() + bt.getAgentDelay( mAgent->getID(), tracker->getID());
if( t < Scheduler.getCurrent()){
printf("BTSession::queryTracker t %f < current agentDelay %f\n", t, bt.getAgentDelay( mAgent->getID(), tracker->getID()));
exit(-1);
}
BTEvent* bte = new BTEvent(TRACKER_GET_REQUEST, t, tracker, this);
bte->setAddParam(btg);
Scheduler.enqueue(bte);
DEBUG(SESSIONDEBUGLEVEL)
printf(" %f peer %d query tracker \n", Scheduler.getCurrent(), mAgent->getID());
}
void BTSession::handler(SimEvent* event){
checkTimeout();
switch(event->getType()){
case PEER_MESSAGE:
handlePeerMessage((BTEvent*)event);
return;
case SESSION_TRACKER_RESPONSE:
handleTrackerResponse((BTEvent*)event);
return;
case SESSION_ANNOUNCE:
announceTracker();
delete event;
return;
case CONNECTION_TIMEOUT:
handleConnectionTimeout( (BTEvent*) event);
return;
case PEER_TIMEOUT:
handleSessionTimeout((BTEvent *)event);
return;
default:
handleError("unknown event type");
return;
}
};
void BTSession::handleConnectionTimeout(BTEvent * event){
BTSocket* con = (BTSocket*)(event->getParam());
DEBUG(SESSIONDEBUGLEVEL)
printf( "%f peer %d rev connection timout", Scheduler.getCurrent(), mAgent->getID() );
if( con->mConnected )
closeConnection( con);
else{
removeConnection( con);
delete con;
}
}
void BTSession::handleSessionTimeout(BTEvent * event){
DEBUG(SESSIONDEBUGLEVEL)
printf( "%f peer %d rev peer timout serv info %s output num %d\n", Scheduler.getCurrent(), mAgent->getID(), mDocument->mkey.c_str(), mOutPutNum);
statistic.addFileUploadNum(mDocument->mkey, mOutPutNum);
mState = stopped;
announceTracker();
if( mAgent->mType != superseed )
mAgent->mState = retire;
mRechoke->cancelRechoke();
mAgent->removeSession( this);
delete event;
}
void BTSession::handleTrackerResponse(BTEvent* event){
BTTrackerResponse* response = (BTTrackerResponse*)(event->getAddParam());
list<BTPeer*>* peerlist = response->mPeerList;
peerlist->remove(mAgent);
list<BTPeer*>::iterator aIt = peerlist->begin();
BTPeer* peer = NULL;
list<BTPeer*>* connlist = NULL;
if( mState == completed || mState == stopped){
delete peerlist;
delete response;
delete event;
mTrackerRequest = NULL;
return;
}
while(aIt != peerlist->end()){
peer = *(aIt++);
if( inCandidate( peer) || peerInConnections(peer))
continue;
addPeerCandidates( peer);
}
while( mPeerCandidates.size() > MAX_CANDIDATES )
mPeerCandidates.erase( mPeerCandidates.begin());
connlist = selectPeers( &mPeerCandidates);
aIt = connlist->begin();
while( aIt!= connlist->end()){
peer = *(aIt ++);
if( !peerInConnections( peer))
connectTo(peer, false);
}
double t = Scheduler.getCurrent() + ANNOUNCE_INTERVAL;
BTEvent* bte = new BTEvent(SESSION_ANNOUNCE, t, this, NULL);
mNextTrackerAnnouncement = bte;
Scheduler.enqueue(bte);
delete connlist;
delete peerlist;
delete response;
delete event;
mTrackerRequest = NULL;
};
void BTSession::announceTracker(void){
if( mTorrent == NULL ||mTorrent->mTracker == NULL){
handleError("BTSession::announceTracker");
return;
}
BTTracker* tracker = mTorrent->mTracker;
BTGetRequest* btg = NULL;
switch( mState ){
case working:
if( mTrackerRequest != NULL)
return;
btg = new BTGetRequest(mTorrent->mKey, start, mDocument->getLeftSize(), this);
break;
case completed:
btg = new BTGetRequest( mTorrent->mKey, complete, 0, this );
Scheduler.cancel( mNextTrackerAnnouncement);
break;
case stopped:
btg = new BTGetRequest( mTorrent->mKey, stop, mDocument->getLeftSize(), this);
Scheduler.cancel( mNextTrackerAnnouncement);
break;
case failed:
Scheduler.cancel( mNextTrackerAnnouncement );
break;
default:
handleError("BTSession::announceTracker:wrong sessionstate");
return;
}
double t = Scheduler.getCurrent() +bt.getAgentDelay( mAgent->getID(), tracker->getID() );
if( t < Scheduler.getCurrent()){
printf("BTSession::announceTracker t %f < current agentDelay %f\n", t, bt.getAgentDelay(mAgent->getID(), tracker->getID() ));
exit(-1);
}
BTEvent* bte = new BTEvent( TRACKER_GET_REQUEST, t, tracker, this);
bte->setAddParam(btg);
Scheduler.enqueue( bte);
DEBUG(SESSIONDEBUGLEVEL)
printf( "%f peer %d announce tracker left size %d\n", Scheduler.getCurrent(), mAgent->getID(), mDocument->getLeftSize());
mTrackerRequest = bte;
};
void BTSession::announcePeers( unsigned int index){
map<BTPeer*, BTSocket*>::iterator aIt = mConnections.begin();
while( aIt != mConnections.end()){
BTSocket* connection = (aIt++)->second;
if( !connection->mConnected)
continue;
BTSession* peerSession = connection->mCounterPart->mSession;
BTPeerMessage* btp = new BTPeerMessage(HAVE);
btp->mIndex = index;
btp->mDownloadHash = mDocument->mkey;
double t = Scheduler.getCurrent();
BTEvent* bte = new BTEvent(PEER_MESSAGE, t, peerSession, this);
bte->setAddParam(btp);
bte->dispatch();//direct have message to peer without through scheduler for reduce the pressure of scheduler
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d send have piece %u to peer %d\n", Scheduler.getCurrent(), mAgent->getID(), index, peerSession->getAgent()->getID());
if( mDocument->isWhole())
sendInterested(false, connection);
}
};
void BTSession::handlePeerMessage(BTEvent* event){
BTPeerMessage* btm = (BTPeerMessage* )event->getAddParam();
if( mState == stopped){
printf("peer %d session has not exist \n", mAgent->getID());
delete event;
return;
}
if( btm->mType != BITFIELD && btm->mType != PIECE ){
BTSession* peerSession = (BTSession*)event->getParam();
BTSocket* con = getConnection( peerSession->getAgent());
if( con == NULL){
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d rev message from peer %d but no conneciton\n", Scheduler.getCurrent(), mAgent->getID(), peerSession->getAgent()->getID());
delete event;
return;
}
if( !con->mConnected ){
printf("%f peer %d have no connection with peer %d\n", Scheduler.getCurrent(), mAgent->getID(), peerSession->getAgent()->getID());
delete event;
return;
}
con->changeConnectionTimeout();
}
switch( btm->mType){
case BITFIELD:
handlePMBitfield(event);
return;
case INTERESTED:
handlePMInsterested(event);
return;
case NOT_INTERESTED:
handlePMNotInsterested(event);
return;
case HAVE:
handlePMHave(event);
return;
case REQUEST:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -