📄 btsession.cpp
字号:
handlePMRequest(event);
return;
case PIECE:
handlePMPiece(event);
return;
case CANCEL:
handlePMCancel(event);
return;
case CHOKE:
handlePMChoke(event);
return;
case UNCHOKE:
handlePMUnChoke(event);
return;
default:
handleError("BTSession::handlePeerMessage:unknown peer message type");
return;
};
};
void BTSession::handlePMBitfield(BTEvent* event){
BTSession* peerSession = (BTSession*)(event->getParam());
BTPeerMessage* btm = (BTPeerMessage*)(event->getAddParam());
BTSocket* peerCon = btm->mConnection;
BTPeer* peer = peerSession->getAgent();
bool insterested = false;
BTSocket* connection = getConnection( peer);
if( connection == NULL){
handleError("BTSession::handlePMBitField:no connection");
delete btm;
delete event;
return;
}
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d rev bitfield from peer %d interested %d\n",Scheduler.getCurrent(), mAgent->getID(), peer->getID(), btm->mInsterested);
//now connection has been set up
connection->mPeerInsterested = btm->mInsterested;
connection->mCounterPart = peerCon;
connection->mConnected =true;
connection->mCounterPart->mCounterPart = connection;
connection->mCounterPart->mConnected = true;
//set up the connection between connection and counterpart
connection->mInConnection = new BTConnection( connection->mCounterPart, connection);
connection->mOutConnection = new BTConnection( connection, connection->mCounterPart);
connection->mCounterPart->mInConnection = connection->mOutConnection;
connection->mCounterPart->mOutConnection = connection->mInConnection;
//test if i am interested in peers
if( !mDocument->isWhole()){
vector<bool>* pieces = mDocument->getPieces();
vector<bool>* peerPieces = btm->mBitfield;
for( unsigned int i = 0; i < mDocument->mPieceNum; i++){
if( (*peerPieces)[i]){
if( !(*pieces)[i]){
insterested = true;
addKnowledgePiece(i, peer);
}
}
}
}
if( insterested ){
sendInterestedTo( true, connection);
}else
sendInterestedTo(false, connection);
if( connection->mPeerInsterested){
if( (int)mUnchokeLists.size() >= mRechoke->mUnchokeNum){
connection->mChoke = false; //becasue connection just connected must sendchoke so should change the flag first
sendChoke(true, connection);
if( !inCandidate( peer))
addPeerCandidates(peer);
mUnchokeLists.erase(peer);
}
else{
connection->mChoke = true;//because connection just connected must sendchoke so should change the flag first
sendChoke(false, connection);
removePeerFromCandidates( peer);
mUnchokeLists.erase(peer);
mUnchokeLists.insert(peer);
}
}
connection->sendConnectionTimeout();
delete btm;
delete event;
};
void BTSession::handlePMInsterested(BTEvent* event){
BTSession* peerSession = (BTSession*)(event->getParam());
BTPeerMessage* btm = (BTPeerMessage*)(event->getAddParam());
BTPeer* peer = peerSession->getAgent();
BTSocket* connection = getConnection(peer);
delete btm;
delete event;
if( connection == NULL){
handleError("BTSession::handlePMInsterested");
return;
}
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d rev insterested from peer %d\n",Scheduler.getCurrent(), mAgent->getID(), peer->getID());
connection->mPeerInsterested =true;
if( (int)mUnchokeLists.size() >= mRechoke->mUnchokeNum){
sendChoke(true, connection);
mUnchokeLists.erase(peer);
if( !inCandidate( peer))
addPeerCandidates( peer);
}
else{
sendChoke( false, connection);
mUnchokeLists.erase( peer );
mUnchokeLists.insert( peer );
removePeerFromCandidates( peer);
}
};
void BTSession::handlePMNotInsterested(BTEvent* event){
BTSession* peerSession = (BTSession*)(event->getParam());
BTPeer* peer = peerSession->getAgent();
BTPeerMessage* btm = (BTPeerMessage*)(event->getAddParam());
BTSocket* connection = getConnection(peer);
delete btm;
delete event;
if( connection == NULL){
handleError("BTSession::handlePMNotInsterested:no connection");
return;
}
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d rev not interested from peer %d\n",Scheduler.getCurrent(), mAgent->getID(), peer->getID());
removePeerFromCandidates( peer);
connection->mPeerInsterested = false;
};
void BTSession::handlePMChoke(BTEvent* event){
BTSession* peerSession = (BTSession*)(event->getParam());
BTPeer* peer = peerSession->getAgent();
BTPeerMessage* btm = (BTPeerMessage*)(event->getAddParam());
BTSocket* connection = getConnection(peer);
delete btm;
delete event;
if( connection == NULL){
handleError("BTSession::handlePMChoke");
return;
}
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d rev choke from peer %d\n",Scheduler.getCurrent(), mAgent->getID(), peer->getID());
connection->mPeerChoke = true;
};
void BTSession::handlePMUnChoke(BTEvent* event){
BTSession* peerSession = (BTSession*)(event->getParam());
BTPeer* peer = peerSession->getAgent();
BTPeerMessage* btm = (BTPeerMessage*)(event->getAddParam());
BTSocket* connection = getConnection(peer);
delete btm;
delete event;
if( connection == NULL){
handleError("BTSession::handlePMUnChoke");
return;
}
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d rev unchoke from peer %d\n",Scheduler.getCurrent(), mAgent->getID(), peer->getID());
connection->mPeerChoke = false;
if( mDocument->isWhole()){
sendInterested(false, connection);
return;
}
sendRequestTo(peer);
};
void BTSession::handlePMHave(BTEvent* event){
BTSession* peerSession = (BTSession*)(event->getParam());
BTPeer* peer = peerSession->getAgent();
BTPeerMessage* btm = (BTPeerMessage*)(event->getAddParam());
unsigned int index = btm->mIndex;
addKnowledgePiece(index, peer);
delete btm;
delete event;
//if avaibable file num > min_com_num, esp will retire
if( mAgent->mType == superseed && countAvlFileNum() >= MIN_COM_NUM )
retireWork();
BTSocket* connection = getConnection( peer);
if( connection == NULL)
return;
if( peerSession->mDocument->getHavePiecesNum() > mDocument->getHavePiecesNum())
connection->cancelConnectionTimeout();
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d rev have piece %u from peer %d\n",Scheduler.getCurrent(), mAgent->getID(), index, peer->getID());
//if i already have that piece do nothing
if( mDocument->havePiece(index))
return;
if( connection == NULL){
handleError(" BTSession::handlePMHave");
return;
}
//if choked do nothing
if( connection->mPeerChoke){
if( !connection->mInsterested)
sendInterested(true, connection);
}
else
sendRequestTo(peer);
};
void BTSession::handlePMRequest(BTEvent* event){
BTSession* peerSession = (BTSession*)(event->getParam());
BTPeerMessage* btm = (BTPeerMessage*)(event->getAddParam());
BTPeer* peer = peerSession->getAgent();
BTSocket* connection = getConnection( peer);
if( connection == NULL ||connection->mChoke || !mDocument->havePiece(btm->mIndex)){
printf("%f in handlePMRequest peer %d to peer %d connection %p choke %d index %u\n", Scheduler.getCurrent(), mAgent->getID(), peer->getID(), connection, connection->mChoke, btm->mIndex);
delete btm;
delete event;
return;
}
if( mAgent->mType == superseed )
mAgent->mCache->inCache(mDocument->mkey, btm->mIndex);
mOutPutNum++;
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d rev request from peer %d\n",Scheduler.getCurrent(), mAgent->getID(), peer->getID());
btm->setType(PIECE);
double t = Scheduler.getCurrent();
BTEvent* bte = new BTEvent( PEER_MESSAGE, t, peerSession, this);
bte->setAddParam(btm);
delete event;
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d send piece to peer %d\n",Scheduler.getCurrent(), mAgent->getID(), peer->getID());
bte->dispatch();
};
void BTSession::handlePMPiece(BTEvent* event){
BTSession* peerSession = (BTSession*)(event->getParam());
BTPeerMessage* btm = (BTPeerMessage*)(event->getAddParam());
BTPeer* peer = peerSession->getAgent();
BTSocket* connection = getConnection(peer);
if( connection == NULL|| connection->mRequestingPiece != btm->mIndex){
handleError("BTSession::handlePMPiece");
delete btm;
delete event;
return;
}
//cancel the requesttimeout message
connection->cancelRequestTimeout();
/*
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d rev piece from peer %d\n",Scheduler.getCurrent(), mAgent->getID(), peer->getID());
*/
connection->mBegin = btm->mBegin;
connection->mLength = btm->mLength;
connection->mDownloadLeft = connection->mLength;
connection->mStartTime = Scheduler.getCurrent();
if( !connection->mInConnection->isActive())
connection->mInConnection->activate();
connection->mDownloading = true;
connection->mCounterPart->mUploading = true;
//adjust the bandwidth
connection->adjustInBandwidth( );
connection->mCounterPart->adjustOutBandwidth( );
BandType* dRate = new BandType( connection->mInConnection->getBandwidth());
if( *dRate < 0){
printf("BTSokcet::rate %ld < 0\n", *dRate);
delete dRate;
exit(-1);
}
double t = Scheduler.getCurrent() + (double)connection->mLength/(double)(*dRate);
if( t< Scheduler.getCurrent()){
handleError("BTSocket::handlePMPiece:t<current");
delete btm;
delete event;
exit(-1);
}
BTEvent* bte =new BTEvent( CONNECTION_BLOCK_DOWNLOAD_FININSHE, t, connection, this);
connection->mEstimateFinish = bte;
bte->setAddParam( dRate);
Scheduler.enqueue(bte);
//should modify the connection bandwidth
delete btm;
delete event;
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d rev piece from peer %d until %f bandwidth %ld\n",Scheduler.getCurrent(), mAgent->getID(), peer->getID(), t, *dRate);
};
void BTSession::handlePMCancel(BTEvent* btm){};
list<BTPeer*>* BTSession::selectPeers(vector<BTPeer*>* peerlists){
list<BTPeer*>* newlist = new list<BTPeer*>;
newlist->clear();
if( peerlists->size() ==0)
return newlist;
//random select PEERNUM peers from peerlist
random_shuffle( peerlists->begin(), peerlists->end());
vector<BTPeer*>::iterator aIt = peerlists->begin();
BTPeer* peer = NULL;
unsigned int i = 0;
while( aIt != peerlists->end()){
peer = *(aIt++);
newlist->push_back(peer);
if( i++ >= PEER_PEERNUMLIMIT)
break;
}
list<BTPeer*>::iterator pIt = newlist->begin();
while( pIt != newlist->end() ){
peer = *(pIt++);
vector<BTPeer*>::iterator kIt = find( peerlists->begin(), peerlists->end(), peer);
if( kIt != peerlists->end())
peerlists->erase(kIt);
}
return newlist;
}
bool BTSession::selectPiece(BTPeer* peer, unsigned int * pieceindex){
vector<bool>* peerPieces = peer->getDocument( mDocument->mkey)->getPieces();
vector< unsigned int>* morePieces = mDocument->morePieces( peerPieces);
if( morePieces->size() == 0){
delete morePieces;
return false;
}
*pieceindex = 0;
//when download begin, random select a piece
if( mDocument->isEmpty() && mDocument->mPartialList.size() == 0){
random_shuffle( morePieces->begin(), morePieces->end());
vector< unsigned int>::iterator aIt = morePieces->begin();
while(aIt != morePieces->end()){
*pieceindex = *(aIt++);
if( mDocument->inDownLoadingSet( *pieceindex))
continue;
delete morePieces;
return true;
}
delete morePieces;
return false;
}
//in download progress, select first the partial piece
if( mDocument->mPartialList.size() > 0){
map< unsigned int, unsigned int>::iterator aIt = mDocument->mPartialList.begin();
while( aIt != mDocument->mPartialList.end()){
*pieceindex = (aIt++)->first;
if( find(morePieces->begin(), morePieces->end(), *pieceindex) != morePieces->end() && !mDocument->inDownLoadingSet( *pieceindex)){
delete morePieces;
return true;
}
}
}
//if no partial piece, select the rare piece
vector<pieceInfo>* pieceInfoList = makePieceInfoList( morePieces );
delete morePieces;
random_shuffle( pieceInfoList->begin(), pieceInfoList->end());
sort(pieceInfoList->begin(), pieceInfoList->end(), lessPeerNum());
vector< pieceInfo>::iterator aIt = pieceInfoList->begin();
pieceInfo pInfo;
while(aIt != pieceInfoList->end()){
pInfo = *(aIt++);
*pieceindex = pInfo.index;
//if peer is superseed then if there is other peer have this piece then this piece will not get from superseed
/*
if( bt.ifEsp( peer ) && pInfo.peerNum > 1)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -