⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 btsession.cpp

📁 模拟P2P各种网络环境的,适合新手们的学习,不错的源码.
💻 CPP
📖 第 1 页 / 共 3 页
字号:
#include <algorithm>
#include <stdio.h>
#include "BTSession.h"
#include "BTEvent.h"
#include "BTTracker.h"
#include "BTTorrent.h"
#include "BTDocument.h"
#include "SimEventScheduler.h"
#include "BTTrackerResponse.h"
#include "BTSocket.h"
#include "BTPeerMessage.h"
#include "BT.h"
#include "BTPeer.h"
#include "BTGetRequest.h"
#include "BTAlgorithmRechoke.h"
#include "BTConnection.h"
#include "Sysparameter.h"
#include "HandleError.h"
#include "Cache.h"
#include "Debug.h"
#include "Statistic.h"
#include "Topology.h"

BTSession::BTSession(string key, BTPeer* agent):
mState(working),
mAgent(agent)
{
	mDocument = agent->getDocument(key);
	mDocument->setSession(this);
	mConnections.clear();
	mPeerCandidates.clear();
	mUnchokeLists.clear();
	mAmountBetweenSnubDetect = 0;
	mRechoke = new BTAlgorithmRechoke(this);
	mKnowledges.resize( mDocument->mPieceNum);
	mDownloadInfo.resize( mDocument->mPieceNum);
	for( unsigned int i = 0 ; i< mDocument->mPieceNum; i++){
		mKnowledges[i] = new list<BTPeer*>;
		mKnowledges[i]->clear();
		mDownloadInfo[i] = 0;
	}
	mOutPutNum = 0;
	mActive = false;
	mStartTime = Scheduler.getCurrent();
	mLastFinishedNum = 0;
	mCouldRetire = false;
	mTrackerRequest = NULL;
};

BTSession::BTSession(BTTorrent* tt, BTPeer* agent):
mState(working),
mAgent(agent){
	mTorrent = tt;
	mDocument = agent->getDocument(tt->mKey);
	if( mDocument == NULL)
		mDocument = agent->addDocument(tt->mKey, tt->mSize, tt->mPopularity);
	mDocument->setSession( this);
	mConnections.clear();
	mPeerCandidates.clear();
	mUnchokeLists.clear();
	mAmountBetweenSnubDetect = 0;
	mRechoke = new BTAlgorithmRechoke(this);
	mKnowledges.resize( mDocument->mPieceNum);
	mDownloadInfo.resize( mDocument->mPieceNum);
	for( unsigned int i = 0 ; i< mDocument->mPieceNum; i++){
		mKnowledges[i] = new list<BTPeer*>;
		mKnowledges[i]->clear();
		mDownloadInfo[i] = 0;
	}
	
	mOutPutNum = 0;
	mActive = false;
	mStartTime = Scheduler.getCurrent();
	mLastFinishedNum = 0;
	mCouldRetire = false;
	mTrackerRequest = NULL;
}

BTSession::~BTSession(void){
	delete mRechoke;
};

void BTSession::addKnowledgePiece( unsigned int index, BTPeer* ptr){
	if(find( mKnowledges[index]->begin(), mKnowledges[index]->end(), ptr) != mKnowledges[index]->end())
		return;
	else
		mKnowledges[index]->push_back( ptr);
};

vector< BTSocket*>* BTSession::getConnections(void){
		vector<BTSocket*>* conlist = new vector<BTSocket*>;
		map<BTPeer*, BTSocket*>::iterator aIt = mConnections.begin();
		BTSocket* con = NULL;
		while( aIt != mConnections.end()){
			con = (aIt++)->second;
			if( !con->mConnected)
				continue;
			
			conlist->push_back(con);
		}
		return conlist;
}

BTSocket* BTSession::getConnection(BTPeer* peer){
		map<BTPeer*, BTSocket*>::iterator aIt = mConnections.find(peer);
		if( aIt != mConnections.end())
			return aIt->second;
		else
			return NULL;
}

void BTSession::removeConnection( BTPeer* peer){
	map<BTPeer*, BTSocket*>::iterator aIt = mConnections.find( peer);
	if( aIt != mConnections.end()){
		DEBUG(SESSIONDEBUGLEVEL)
			printf("%f peer %d remove connection to peer %d left connection num %d\n", Scheduler.getCurrent(), mAgent->getID(), aIt->first->getID(), mConnections.size());
		mConnections.erase(aIt);
	}
	return;
}

void BTSession::removeConnection( BTSocket* con){
	map<BTPeer*, BTSocket*>::iterator aIt = mConnections.begin();
	while(  aIt != mConnections.end()) {
		if( aIt->second == con){
			mConnections.erase(aIt);
			DEBUG(SESSIONDEBUGLEVEL)
				printf("%f peer %d remove connection to peer %d left connection num %d\n", Scheduler.getCurrent(), mAgent->getID(), aIt->first->getID(), mConnections.size());
			return;
		}
		++aIt;
	}

}
void BTSession::closeConnection(BTSocket* con){
	mAgent->mConnectionNum --;
	BTPeer* peer = con->mCounterPart->mSession->getAgent();
	removePeerFromCandidates( peer );
	removePeerFromUnchokeList( peer);
	removeConnection( peer);
	removePeerFromKnowledge( peer);
	
	if( con->mInConnection != NULL){
		delete con->mInConnection;
		con->mInConnection= NULL;
		con->mCounterPart->mOutConnection = NULL;
	}
	
	if( con->mOutConnection != NULL){
		delete con->mOutConnection;
		con->mOutConnection= NULL;
		con->mCounterPart->mInConnection = NULL;
	}
	
	con->mClosed = true;
	if(  !con->mCounterPart->mClosed )
		if( con->mCounterPart->mConnected)
			con->mCounterPart->mSession->closeConnection( con->mCounterPart);
		else{
			con->mCounterPart->mSession->removeConnection( con->mCounterPart);
			delete con->mCounterPart;
		}
	
	delete con;
	
	if( mAgent->mType == superseed &&  mConnections.size() == 0){
		mActive = false;
		mRechoke->cancelRechoke();
	}
	if( mAgent->mType == normal && !mDocument->isWhole() && mConnections.size() < MIN_SESSION_CON_NUM)
		announceTracker();

}
void BTSession::queryTracker(void){
	BTTracker* tracker = mTorrent->mTracker;
	BTGetRequest* btg = new BTGetRequest( mTorrent->mKey, start, mDocument->getLeftSize(), this );
	double t = Scheduler.getCurrent() + bt.getAgentDelay( mAgent->getID(), tracker->getID());
	if( t < Scheduler.getCurrent()){
		printf("BTSession::queryTracker  t %f < current agentDelay %f\n", t, bt.getAgentDelay( mAgent->getID(), tracker->getID()));
		exit(-1);
	}
	BTEvent* bte = new BTEvent(TRACKER_GET_REQUEST, t, tracker, this);
	bte->setAddParam(btg);
	Scheduler.enqueue(bte);

	DEBUG(SESSIONDEBUGLEVEL)
		printf(" %f peer %d query tracker \n", Scheduler.getCurrent(), mAgent->getID());

}
void BTSession::handler(SimEvent* event){
	checkTimeout();
	
	switch(event->getType()){
		case PEER_MESSAGE:	
			handlePeerMessage((BTEvent*)event);	
			return;
		case SESSION_TRACKER_RESPONSE:
			handleTrackerResponse((BTEvent*)event);
			return;
		case SESSION_ANNOUNCE:
			announceTracker();
			delete event;
			return;
		case CONNECTION_TIMEOUT:
			handleConnectionTimeout( (BTEvent*) event);
			return;
		case PEER_TIMEOUT:
			handleSessionTimeout((BTEvent *)event);
			return;
		default:
			handleError("unknown event type");
			return;
	}

};
void BTSession::handleConnectionTimeout(BTEvent * event){
	BTSocket* con = (BTSocket*)(event->getParam());
	DEBUG(SESSIONDEBUGLEVEL)
		printf( "%f peer %d rev connection timout", Scheduler.getCurrent(), mAgent->getID() );
	if( con->mConnected )
		closeConnection( con);
	else{
		removeConnection( con);
		delete con;
	}
}

void BTSession::handleSessionTimeout(BTEvent * event){
	DEBUG(SESSIONDEBUGLEVEL)
		printf( "%f peer %d rev peer timout serv info %s output num %d\n", Scheduler.getCurrent(), mAgent->getID(), mDocument->mkey.c_str(), mOutPutNum);
	statistic.addFileUploadNum(mDocument->mkey, mOutPutNum);
	mState = stopped;
	announceTracker();
	if( mAgent->mType != superseed )
		mAgent->mState = retire;
	
	mRechoke->cancelRechoke();
	mAgent->removeSession( this);
       delete event;
}

void BTSession::handleTrackerResponse(BTEvent* event){
	BTTrackerResponse* response = (BTTrackerResponse*)(event->getAddParam());
	list<BTPeer*>* peerlist = response->mPeerList;
	peerlist->remove(mAgent);
	list<BTPeer*>::iterator aIt = peerlist->begin();
	BTPeer* peer = NULL;
	list<BTPeer*>* connlist = NULL;
	
	if( mState == completed || mState == stopped){
		delete peerlist;
		delete response;
		delete event;
		mTrackerRequest = NULL;
		return;
	}
	
	while(aIt != peerlist->end()){
		peer = *(aIt++);
		if( inCandidate( peer) || peerInConnections(peer))
			continue;
		addPeerCandidates( peer);
	}
	
	while( mPeerCandidates.size() > MAX_CANDIDATES )
		mPeerCandidates.erase( mPeerCandidates.begin());
	
	connlist = selectPeers( &mPeerCandidates);
	aIt = connlist->begin();
	while( aIt!= connlist->end()){
		peer = *(aIt ++);
		if( !peerInConnections( peer))
			connectTo(peer, false);
	}
	double t = Scheduler.getCurrent() + ANNOUNCE_INTERVAL;
	BTEvent* bte = new BTEvent(SESSION_ANNOUNCE, t, this, NULL);
	mNextTrackerAnnouncement = bte;
	Scheduler.enqueue(bte);

	delete connlist;
	delete peerlist;
	delete response;
	delete event;
	mTrackerRequest = NULL;
	
};

void BTSession::announceTracker(void){
	if( mTorrent == NULL ||mTorrent->mTracker == NULL){
		handleError("BTSession::announceTracker");
		return;
	}
	BTTracker* tracker = mTorrent->mTracker;
	BTGetRequest* btg = NULL;
	switch( mState ){
		case working:
			if( mTrackerRequest != NULL)
				return;
			btg = new BTGetRequest(mTorrent->mKey, start, mDocument->getLeftSize(), this);
		       break;
		case completed:
			btg = new BTGetRequest( mTorrent->mKey, complete, 0, this );
			Scheduler.cancel( mNextTrackerAnnouncement);
			break;
		case stopped:
			btg = new BTGetRequest( mTorrent->mKey, stop, mDocument->getLeftSize(), this);
			Scheduler.cancel( mNextTrackerAnnouncement);
			break;
		case failed:
			Scheduler.cancel( mNextTrackerAnnouncement );
			break;
	       default:
		   	handleError("BTSession::announceTracker:wrong sessionstate");
			return;
	}
	double t = Scheduler.getCurrent() +bt.getAgentDelay( mAgent->getID(), tracker->getID() );
	if( t < Scheduler.getCurrent()){
		printf("BTSession::announceTracker t %f < current agentDelay %f\n", t, bt.getAgentDelay(mAgent->getID(), tracker->getID() ));
		exit(-1);
	}
	BTEvent* bte = new BTEvent(  TRACKER_GET_REQUEST, t, tracker, this);
	bte->setAddParam(btg);
	Scheduler.enqueue( bte);

	DEBUG(SESSIONDEBUGLEVEL)
		printf( "%f peer %d announce tracker left size %d\n", Scheduler.getCurrent(), mAgent->getID(), mDocument->getLeftSize());
	mTrackerRequest = bte;
};

void BTSession::announcePeers( unsigned int index){
	map<BTPeer*, BTSocket*>::iterator aIt = mConnections.begin();
	while( aIt != mConnections.end()){
		BTSocket* connection = (aIt++)->second;
		if( !connection->mConnected)
			continue;
		BTSession* peerSession = connection->mCounterPart->mSession;
		BTPeerMessage* btp = new BTPeerMessage(HAVE);
		btp->mIndex = index;
		btp->mDownloadHash = mDocument->mkey;
		
		double t = Scheduler.getCurrent();
		BTEvent* bte = new BTEvent(PEER_MESSAGE, t, peerSession, this);
		bte->setAddParam(btp);
		bte->dispatch();//direct have message to peer without through scheduler for reduce the pressure of scheduler
		DEBUG(SESSIONDEBUGLEVEL)
			printf("%f peer %d send have piece %u to peer %d\n", Scheduler.getCurrent(), mAgent->getID(), index, peerSession->getAgent()->getID());

		if( mDocument->isWhole())
			sendInterested(false, connection);	
	}
};

void BTSession::handlePeerMessage(BTEvent* event){
	BTPeerMessage* btm = (BTPeerMessage* )event->getAddParam();
	
	if( mState == stopped){
		printf("peer %d session has not exist \n", mAgent->getID());
		delete event;
		return;
	}
	
	if( btm->mType != BITFIELD && btm->mType != PIECE ){
		BTSession* peerSession = (BTSession*)event->getParam();
		BTSocket* con = getConnection( peerSession->getAgent());
		if( con == NULL){
			DEBUG(SESSIONDEBUGLEVEL)
				printf("%f peer %d rev message from peer %d but no conneciton\n", Scheduler.getCurrent(), mAgent->getID(), peerSession->getAgent()->getID());
			delete event;
			return;
		}
		if( !con->mConnected ){
			printf("%f peer %d have no connection with peer %d\n", Scheduler.getCurrent(), mAgent->getID(), peerSession->getAgent()->getID());
			delete event;
			return;
		}
		con->changeConnectionTimeout();
	}
	
	switch( btm->mType){
		case BITFIELD:
			handlePMBitfield(event);
			return;
		case INTERESTED:
			handlePMInsterested(event);
			return;
		case NOT_INTERESTED:
			handlePMNotInsterested(event);
			return;
		case HAVE:
			handlePMHave(event);
			return;
		case REQUEST:

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -