⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 btsession.cpp

📁 模拟P2P各种网络环境的,适合新手们的学习,不错的源码.
💻 CPP
📖 第 1 页 / 共 3 页
字号:
			continue;
		*/
		if( !mDocument->inDownLoadingSet( *pieceindex)){
			delete pieceInfoList;
			return true;
		}
	}
	delete pieceInfoList;
	return false;
	
};

void BTSession::connectTo(BTPeer* peer, bool flag){
	BTSocket* connection = NULL;
	if( peerInConnections( peer)){
		connection = getConnection( peer);
		if( connection->mPeerInsterested)
			sendChoke( flag,  connection);
		return;
	}
	connection = addConnection( peer);
	if( connection == NULL)
		return;
	BTPeerMessage* ptm = new BTPeerMessage(HAND_SHAKE);
	ptm->mConnection = connection;
	ptm->mDownloadHash = mDocument->mkey;
	ptm->mBitfield = mDocument->getPieces();
	ptm->mInsterested = true;
	double t = Scheduler.getCurrent() + bt.getAgentDelay( mAgent->getID(), peer->getID());
	if( t < Scheduler.getCurrent()){
		printf("BTPeer::HandleShakeHand t %f < current agentDelay %f\n", t, bt.getAgentDelay( mAgent->getID(), peer->getID()));
		exit(-1);
	}
	BTEvent* bte = new BTEvent(PEER_MESSAGE, t, peer, this);
	bte->setAddParam(ptm);
	Scheduler.enqueue(bte);
	DEBUG(SESSIONDEBUGLEVEL)
		printf("%f peer %d send hand_shake to peer %d\n", Scheduler.getCurrent(), getAgent()->getID(), peer->getID());
	connection->sendConnectionTimeout();
//	bte->dispatch();
};

BTSocket* BTSession::addConnection(BTPeer* peer){
	if( peer == mAgent){
		printf("BTSession::addConection:peer %d add itself\n", mAgent->getID());
		return NULL;
	}

	if( mConnections.size() >= mAgent->MaxConnectionNum){
		DEBUG(SESSIONDEBUGLEVEL)
			printf(" peer %d has too many connections, so refuse connectio to peer %d\n", mAgent->getID(), peer->getID());
		return NULL;
	}
	map<BTPeer*, BTSocket*>::iterator aIt = mConnections.find( peer );
	if( aIt != mConnections.end()){
		printf("BTSession::addConnection:peer exist\n");
		exit(-1);
	}
	
	if( mConnections.size()  == 0)
		mRechoke->nextSchedule();
	
	BTSocket* pts = new BTSocket(this);
	mConnections.insert( map<BTPeer*, BTSocket*>::value_type( peer, pts));
	mAgent->mConnectionNum++;
	mActive = true;
	return pts;
};

void BTSession::sendInterested(bool flag, BTSocket* con){
	if( con->mInsterested == flag||con->mClosed)
		return;
	con->mInsterested = flag;
	BTSession* peerSession = con->mCounterPart->mSession;
	BTPeerMessage* btm = NULL;
	if( flag)
		 btm = new BTPeerMessage(INTERESTED);
	else
		btm = new BTPeerMessage(NOT_INTERESTED);
	double t = Scheduler.getCurrent() + bt.getAgentDelay( mAgent->getID(), peerSession->getAgent()->getID());
	if( t < Scheduler.getCurrent()){
		printf("BTSession::sendInterested t %f < current agentDelay %f\n", t, bt.getAgentDelay( mAgent->getID(), peerSession->getAgent()->getID()));
		exit(-1);
	}
	BTEvent* bte = new BTEvent(PEER_MESSAGE, t, peerSession,this);
	bte->setAddParam(btm);
	Scheduler.enqueue(bte);
	DEBUG(SESSIONDEBUGLEVEL)
		printf("%f  peer %d send interested %d  to peer %d\n",Scheduler.getCurrent(), mAgent->getID(), flag,  peerSession->getAgent()->getID());
};

void BTSession::sendInterestedTo(bool flag, BTSocket* con){
	con->mInsterested = flag;
	BTSession* peerSession = con->mCounterPart->mSession;
	BTPeerMessage* btm = NULL;
	if( flag)
		 btm = new BTPeerMessage(INTERESTED);
	else
		btm = new BTPeerMessage(NOT_INTERESTED);
	double t = Scheduler.getCurrent() + bt.getAgentDelay( mAgent->getID(), peerSession->getAgent()->getID());
	if( t < Scheduler.getCurrent()){
		printf("BTSession::sendInterestedTo t %f < current agentDelay %f\n", t, bt.getAgentDelay( mAgent->getID(), peerSession->getAgent()->getID()));
		exit(-1);
	}
	BTEvent* bte = new BTEvent(PEER_MESSAGE, t, peerSession,this);
	bte->setAddParam(btm);
	Scheduler.enqueue(bte);
	DEBUG(SESSIONDEBUGLEVEL)
		printf("%f  peer %d send interested %d  to peer %d\n",Scheduler.getCurrent(), mAgent->getID(), flag,  peerSession->getAgent()->getID());
};


void BTSession::sendChoke(bool flag, BTSocket* con){	
	if( con->mChoke == flag ||con->mClosed || !con->mPeerInsterested)
		return;
	con->mChoke = flag;
	BTSession* peerSession = con->mCounterPart->mSession;
	BTPeerMessage* btm = NULL;
	if( flag){
		btm = new BTPeerMessage(CHOKE);
		mUnchokeLists.erase(peerSession->getAgent());
	}
	else{
		btm = new BTPeerMessage(UNCHOKE);
		mUnchokeLists.erase( peerSession->getAgent());
		mUnchokeLists.insert( peerSession->getAgent());
	}
	double t = Scheduler.getCurrent() + bt.getAgentDelay( mAgent->getID(), peerSession->getAgent()->getID());
	if( t < Scheduler.getCurrent()){
		printf("BTSession::sendchoke t %f < current agentDelay %f\n", t, bt.getAgentDelay( mAgent->getID(), peerSession->getAgent()->getID()));
		exit(-1);
	}
	BTEvent* bte = new BTEvent(PEER_MESSAGE, t, peerSession, this);
	bte->setAddParam(btm);
	Scheduler.enqueue( bte);
	DEBUG(SESSIONDEBUGLEVEL)
		printf("%f peer %d send peer %d choke %d \n", Scheduler.getCurrent(), mAgent->getID(), peerSession->getAgent()->getID(), flag );
}

void BTSession::sendRequest( BTPeer* peer, int index){
	BTSocket* connection = getConnection( peer);
	if( connection == NULL || !connection->mConnected ||connection->mClosed){
		handleError( "BTSession::sendRequest:no connection");
		return;
	}
	BTPeerMessage* btm = new BTPeerMessage( REQUEST );
	btm->setKey(mDocument->mkey);
	btm->setIndex(index);

	//add in partiallist
	if( mDocument->inPartialList( index))
		btm->setBegin(mDocument->getPieceProgress(index));
	else{
		mDocument->setPieceProgress( index, 0);
		btm->setBegin(0);
	}

	//compute the reuqest lenth
	unsigned int piecelength = mDocument->getPieceLength( index);
	if( (piecelength - btm->mBegin ) > mDocument->mBlockLength)
		btm->setLength( mDocument->mBlockLength);
	else
		btm->setLength( piecelength - btm->mBegin);

	double t = Scheduler.getCurrent();
	BTEvent* bte = new BTEvent( PEER_MESSAGE, t, connection->mCounterPart->mSession, this);
	
	bte->setAddParam( btm);
	Scheduler.enqueue(bte);
	connection->mRequestingPiece = index;
	mDocument->addPiecedownloading( index);
	DEBUG(SESSIONDEBUGLEVEL)
		printf( "%f peer %d send request to peer %d index %d begin %d lenth %d  peerNum %d\n", Scheduler.getCurrent(), mAgent->getID(), peer->getID(), btm->mIndex, btm->mBegin, btm->mLength, countPiecePeers(btm->mIndex) );

	connection->sendRequestTimeout();
//	bte->dispatch();//reuqest will not be put into heap to relieve the heap pressure
	
};

void BTSession::sendRequestTo(BTPeer* peer){
	BTSocket* connection = getConnection(peer);
       if( connection == NULL){
	  	handleError("BTSession::sendRequestTo:no connection");
		return;
	}
	   //if i am choked by peer should not send request
	if( connection->mPeerChoke || connection->mRequestingPiece != MAX_UNSIGNED_VALUE )
		return;
	unsigned int index = 0;
	if( ! selectPiece(peer, &index)){
		//should disable the connection
		connection->mInConnection->inactivate();
		connection->adjustInBandwidth();
		connection->mCounterPart->adjustOutBandwidth();
		sendInterested( false, connection);
	}
	else
		sendRequest( peer, index);
};

vector<pieceInfo>* BTSession::makePieceInfoList(vector <unsigned  int > * piece){
	vector<pieceInfo>* infoList = new vector<pieceInfo>(piece->size());
	for( unsigned int i =0; i< piece->size(); i++){
		(*infoList)[i].index = (*piece)[i];
		(*infoList)[i].peerNum = countPiecePeers( (*piece)[i]);
	}
	return infoList;
}

unsigned int BTSession::countPiecePeers( unsigned int index){
		if(  index > mKnowledges.size()){
			printf(" BTSession::countPiecePeers:wrong index %u", index);
			return 0;
		}
		return mKnowledges[index]->size();
};

void BTSession::addPeerCandidates( BTPeer* peer){
	if( mAgent == peer ){
		printf("BTSession::addPeerCandidates:peer %d add itself\n", mAgent->getID());
		exit(-1);
	}
	mPeerCandidates.push_back(peer);
}

//check if connectin which has timout exist
void BTSession::handleConnectionTimeout(	void){
	list<BTSocket* >::iterator aIt, pIt;
	aIt = mConnectTimoutList.begin();
	BTSocket* con = NULL;
	while( aIt != mConnectTimoutList.end() ){
		pIt = aIt++;
		con = *pIt;
		if( con->mConnectionTimeout->getTimeStamp() <= Scheduler.getCurrent() ){
			handleConnectionTimeout( con->mConnectionTimeout );
			continue;
		}
		break;
	}

}

void BTSession::checkTimeout(void){
	handleConnectionTimeout();
	handleRequestTimeout();
}
//check if request which has timout exist
void BTSession::handleRequestTimeout(void){
	list<BTSocket* >::iterator aIt, pIt;
	aIt = mRequestTimoutList.begin();
	BTSocket* con = NULL;
	while( aIt != mRequestTimoutList.end() ){
		pIt = aIt ++;
		con = *pIt;
		if( con->mRequestTimeout->getTimeStamp() <= Scheduler.getCurrent() ){
			mRequestTimoutList.erase( pIt);
			con->handleRequestTimeout( con->mRequestTimeout );
			continue;
		}
		break;
	}

}

void BTSession::addPieceDownload(unsigned int index){
	mDownloadInfo[index] ++;
	checkRetire();
}

void BTSession::printPieceDownloadInfo( void){
	for( unsigned int i = 0; i<mDocument->mPieceNum; i++)
		printf(" piece %u downloadtimes %d\n", i, mDownloadInfo[i]);

};

void BTSession::removePeerFromKnowledge( BTPeer* peer){
	for( unsigned int i = 0; i< mKnowledges.size() ; i++){
		list<BTPeer*>::iterator aIt = find( mKnowledges[i]->begin(), mKnowledges[i]->end(), peer);
		if( aIt != mKnowledges[i]->end() )
			mKnowledges[i]->erase(aIt);
		if( mKnowledges[i]->size() ==1 && mKnowledges[i]->front()->mType == superseed ){
			BTSocket* con = getConnection( mKnowledges[i]->front() );
			if( !con  ->mDownloading)
				sendRequestTo( mKnowledges[i]->front() );
		}
	}

};
bool  BTSession::retireWork(void){
	if( bt.mTracker->getPeerNumSince(mDocument->mkey, mStartTime) <= ESP_RETIRE_PEER_NUM )
		return false;
	Topology *topology = Topology::getInstance();
	
	if( topology->getAvailableBandwidth( false, mAgent) > MIN_OUT_BANDWIDTH && mAgent->getActiveSessionNum() < DANGER_INFO_NUM ){
		mCouldRetire = true;
		return false;
	}
	
	printf("%f esp retire server for file %s outputnum %fM\n", Scheduler.getCurrent(), mDocument->mkey.c_str(), mOutPutNum*16/1024.0);
	mState = stopped;
	mRechoke->cancelRechoke();
	
	bt.mTracker->setEspSate( mDocument->mkey, false);
	mCouldRetire = false;
	return true;
}

void BTSession:: recoverEsp( void ){
	if( mState == working )
		return;
	mState = working;
	mRechoke->nextSchedule();
	mStartTime = Scheduler.getCurrent();
	bt.mTracker->setEspSate( mDocument->mkey, true);
}
bool BTSession::ifNoInterestedPeer(void){
	map<BTPeer*, BTSocket*>::iterator aIt = mConnections.begin();
	bool flag = true;
	while( aIt != mConnections.end() ){
		if( !aIt->second->mConnected)
			return false;
		if( aIt->second->mInsterested )
			flag = false;
		aIt++;
	}
	return flag;
}
void BTSession::requestEsp(void){
	BTPeer* peer = bt.mEspList.front();
	if( peerInConnections(peer ))
		sendRequestTo( peer);
	else
		connectTo( peer, true);
}

bool BTSession::ifNoAvailablePeer(void){
	for( unsigned int i =  0; i< mKnowledges.size(); i++)
		if( mKnowledges[i]->size() >1)
			return false;
	return true;
};

bool BTSession::ifUpload(void){
	map<BTPeer*, BTSocket*>::iterator aIt = mConnections.begin();
	BTSocket* p = NULL;
	while( aIt != mConnections.end() ){
		p = (aIt++)->second;
		if( p != NULL && p->mOutConnection != NULL && p->mOutConnection->isActive() )
			return true;
	}
	return false;
};

bool BTSession::checkRetire(void){
	unsigned int finishednum = mDownloadInfo[1];
	for( unsigned  int i = 0; i< mDownloadInfo.size(); i++){
		if( mDownloadInfo[i]  == 0 )
			return false;
		else if( mDownloadInfo[i] < finishednum )
			finishednum = mDownloadInfo[i];
	}
	unsigned int peerNum = bt.mTracker->getPeerNumSince( mDocument->mkey, mStartTime);
	
      if(  (!mLastFinishedNum && finishednum  >= MIN_COM_NUM)||( mLastFinishedNum && finishednum - mLastFinishedNum >=1 ) ){
		printf("%f esp retire when file finishednum %d peerarrivenum %d\n", Scheduler.getCurrent(), finishednum,  peerNum);
		if( retireWork() ){
			mLastFinishedNum = finishednum;
			return true;
		}
	}

	return false;

}
bool lessPeerNum::operator()(pieceInfo const&  info1, pieceInfo const& info2) const{
	if( info1.peerNum < info2.peerNum)
		return true;
	else 
		return false;
};
bool OutlessCompare::operator()(BTSocket* const &  peer1, BTSocket* const &  peer2) const{
	double timestamp = Scheduler.getCurrent() - CAL_BANWIDTH_INTERVAL;
	double num1 = peer1->calcOutAverageBandwidthSince( timestamp);
	double num2 = peer2->calcOutAverageBandwidthSince(timestamp);
	if( num1 > num2)
		return false;
	else 
		return true;
};

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -