📄 btsession.cpp
字号:
continue;
*/
if( !mDocument->inDownLoadingSet( *pieceindex)){
delete pieceInfoList;
return true;
}
}
delete pieceInfoList;
return false;
};
void BTSession::connectTo(BTPeer* peer, bool flag){
BTSocket* connection = NULL;
if( peerInConnections( peer)){
connection = getConnection( peer);
if( connection->mPeerInsterested)
sendChoke( flag, connection);
return;
}
connection = addConnection( peer);
if( connection == NULL)
return;
BTPeerMessage* ptm = new BTPeerMessage(HAND_SHAKE);
ptm->mConnection = connection;
ptm->mDownloadHash = mDocument->mkey;
ptm->mBitfield = mDocument->getPieces();
ptm->mInsterested = true;
double t = Scheduler.getCurrent() + bt.getAgentDelay( mAgent->getID(), peer->getID());
if( t < Scheduler.getCurrent()){
printf("BTPeer::HandleShakeHand t %f < current agentDelay %f\n", t, bt.getAgentDelay( mAgent->getID(), peer->getID()));
exit(-1);
}
BTEvent* bte = new BTEvent(PEER_MESSAGE, t, peer, this);
bte->setAddParam(ptm);
Scheduler.enqueue(bte);
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d send hand_shake to peer %d\n", Scheduler.getCurrent(), getAgent()->getID(), peer->getID());
connection->sendConnectionTimeout();
// bte->dispatch();
};
BTSocket* BTSession::addConnection(BTPeer* peer){
if( peer == mAgent){
printf("BTSession::addConection:peer %d add itself\n", mAgent->getID());
return NULL;
}
if( mConnections.size() >= mAgent->MaxConnectionNum){
DEBUG(SESSIONDEBUGLEVEL)
printf(" peer %d has too many connections, so refuse connectio to peer %d\n", mAgent->getID(), peer->getID());
return NULL;
}
map<BTPeer*, BTSocket*>::iterator aIt = mConnections.find( peer );
if( aIt != mConnections.end()){
printf("BTSession::addConnection:peer exist\n");
exit(-1);
}
if( mConnections.size() == 0)
mRechoke->nextSchedule();
BTSocket* pts = new BTSocket(this);
mConnections.insert( map<BTPeer*, BTSocket*>::value_type( peer, pts));
mAgent->mConnectionNum++;
mActive = true;
return pts;
};
void BTSession::sendInterested(bool flag, BTSocket* con){
if( con->mInsterested == flag||con->mClosed)
return;
con->mInsterested = flag;
BTSession* peerSession = con->mCounterPart->mSession;
BTPeerMessage* btm = NULL;
if( flag)
btm = new BTPeerMessage(INTERESTED);
else
btm = new BTPeerMessage(NOT_INTERESTED);
double t = Scheduler.getCurrent() + bt.getAgentDelay( mAgent->getID(), peerSession->getAgent()->getID());
if( t < Scheduler.getCurrent()){
printf("BTSession::sendInterested t %f < current agentDelay %f\n", t, bt.getAgentDelay( mAgent->getID(), peerSession->getAgent()->getID()));
exit(-1);
}
BTEvent* bte = new BTEvent(PEER_MESSAGE, t, peerSession,this);
bte->setAddParam(btm);
Scheduler.enqueue(bte);
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d send interested %d to peer %d\n",Scheduler.getCurrent(), mAgent->getID(), flag, peerSession->getAgent()->getID());
};
void BTSession::sendInterestedTo(bool flag, BTSocket* con){
con->mInsterested = flag;
BTSession* peerSession = con->mCounterPart->mSession;
BTPeerMessage* btm = NULL;
if( flag)
btm = new BTPeerMessage(INTERESTED);
else
btm = new BTPeerMessage(NOT_INTERESTED);
double t = Scheduler.getCurrent() + bt.getAgentDelay( mAgent->getID(), peerSession->getAgent()->getID());
if( t < Scheduler.getCurrent()){
printf("BTSession::sendInterestedTo t %f < current agentDelay %f\n", t, bt.getAgentDelay( mAgent->getID(), peerSession->getAgent()->getID()));
exit(-1);
}
BTEvent* bte = new BTEvent(PEER_MESSAGE, t, peerSession,this);
bte->setAddParam(btm);
Scheduler.enqueue(bte);
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d send interested %d to peer %d\n",Scheduler.getCurrent(), mAgent->getID(), flag, peerSession->getAgent()->getID());
};
void BTSession::sendChoke(bool flag, BTSocket* con){
if( con->mChoke == flag ||con->mClosed || !con->mPeerInsterested)
return;
con->mChoke = flag;
BTSession* peerSession = con->mCounterPart->mSession;
BTPeerMessage* btm = NULL;
if( flag){
btm = new BTPeerMessage(CHOKE);
mUnchokeLists.erase(peerSession->getAgent());
}
else{
btm = new BTPeerMessage(UNCHOKE);
mUnchokeLists.erase( peerSession->getAgent());
mUnchokeLists.insert( peerSession->getAgent());
}
double t = Scheduler.getCurrent() + bt.getAgentDelay( mAgent->getID(), peerSession->getAgent()->getID());
if( t < Scheduler.getCurrent()){
printf("BTSession::sendchoke t %f < current agentDelay %f\n", t, bt.getAgentDelay( mAgent->getID(), peerSession->getAgent()->getID()));
exit(-1);
}
BTEvent* bte = new BTEvent(PEER_MESSAGE, t, peerSession, this);
bte->setAddParam(btm);
Scheduler.enqueue( bte);
DEBUG(SESSIONDEBUGLEVEL)
printf("%f peer %d send peer %d choke %d \n", Scheduler.getCurrent(), mAgent->getID(), peerSession->getAgent()->getID(), flag );
}
void BTSession::sendRequest( BTPeer* peer, int index){
BTSocket* connection = getConnection( peer);
if( connection == NULL || !connection->mConnected ||connection->mClosed){
handleError( "BTSession::sendRequest:no connection");
return;
}
BTPeerMessage* btm = new BTPeerMessage( REQUEST );
btm->setKey(mDocument->mkey);
btm->setIndex(index);
//add in partiallist
if( mDocument->inPartialList( index))
btm->setBegin(mDocument->getPieceProgress(index));
else{
mDocument->setPieceProgress( index, 0);
btm->setBegin(0);
}
//compute the reuqest lenth
unsigned int piecelength = mDocument->getPieceLength( index);
if( (piecelength - btm->mBegin ) > mDocument->mBlockLength)
btm->setLength( mDocument->mBlockLength);
else
btm->setLength( piecelength - btm->mBegin);
double t = Scheduler.getCurrent();
BTEvent* bte = new BTEvent( PEER_MESSAGE, t, connection->mCounterPart->mSession, this);
bte->setAddParam( btm);
Scheduler.enqueue(bte);
connection->mRequestingPiece = index;
mDocument->addPiecedownloading( index);
DEBUG(SESSIONDEBUGLEVEL)
printf( "%f peer %d send request to peer %d index %d begin %d lenth %d peerNum %d\n", Scheduler.getCurrent(), mAgent->getID(), peer->getID(), btm->mIndex, btm->mBegin, btm->mLength, countPiecePeers(btm->mIndex) );
connection->sendRequestTimeout();
// bte->dispatch();//reuqest will not be put into heap to relieve the heap pressure
};
void BTSession::sendRequestTo(BTPeer* peer){
BTSocket* connection = getConnection(peer);
if( connection == NULL){
handleError("BTSession::sendRequestTo:no connection");
return;
}
//if i am choked by peer should not send request
if( connection->mPeerChoke || connection->mRequestingPiece != MAX_UNSIGNED_VALUE )
return;
unsigned int index = 0;
if( ! selectPiece(peer, &index)){
//should disable the connection
connection->mInConnection->inactivate();
connection->adjustInBandwidth();
connection->mCounterPart->adjustOutBandwidth();
sendInterested( false, connection);
}
else
sendRequest( peer, index);
};
vector<pieceInfo>* BTSession::makePieceInfoList(vector <unsigned int > * piece){
vector<pieceInfo>* infoList = new vector<pieceInfo>(piece->size());
for( unsigned int i =0; i< piece->size(); i++){
(*infoList)[i].index = (*piece)[i];
(*infoList)[i].peerNum = countPiecePeers( (*piece)[i]);
}
return infoList;
}
unsigned int BTSession::countPiecePeers( unsigned int index){
if( index > mKnowledges.size()){
printf(" BTSession::countPiecePeers:wrong index %u", index);
return 0;
}
return mKnowledges[index]->size();
};
void BTSession::addPeerCandidates( BTPeer* peer){
if( mAgent == peer ){
printf("BTSession::addPeerCandidates:peer %d add itself\n", mAgent->getID());
exit(-1);
}
mPeerCandidates.push_back(peer);
}
//check if connectin which has timout exist
void BTSession::handleConnectionTimeout( void){
list<BTSocket* >::iterator aIt, pIt;
aIt = mConnectTimoutList.begin();
BTSocket* con = NULL;
while( aIt != mConnectTimoutList.end() ){
pIt = aIt++;
con = *pIt;
if( con->mConnectionTimeout->getTimeStamp() <= Scheduler.getCurrent() ){
handleConnectionTimeout( con->mConnectionTimeout );
continue;
}
break;
}
}
void BTSession::checkTimeout(void){
handleConnectionTimeout();
handleRequestTimeout();
}
//check if request which has timout exist
void BTSession::handleRequestTimeout(void){
list<BTSocket* >::iterator aIt, pIt;
aIt = mRequestTimoutList.begin();
BTSocket* con = NULL;
while( aIt != mRequestTimoutList.end() ){
pIt = aIt ++;
con = *pIt;
if( con->mRequestTimeout->getTimeStamp() <= Scheduler.getCurrent() ){
mRequestTimoutList.erase( pIt);
con->handleRequestTimeout( con->mRequestTimeout );
continue;
}
break;
}
}
void BTSession::addPieceDownload(unsigned int index){
mDownloadInfo[index] ++;
checkRetire();
}
void BTSession::printPieceDownloadInfo( void){
for( unsigned int i = 0; i<mDocument->mPieceNum; i++)
printf(" piece %u downloadtimes %d\n", i, mDownloadInfo[i]);
};
void BTSession::removePeerFromKnowledge( BTPeer* peer){
for( unsigned int i = 0; i< mKnowledges.size() ; i++){
list<BTPeer*>::iterator aIt = find( mKnowledges[i]->begin(), mKnowledges[i]->end(), peer);
if( aIt != mKnowledges[i]->end() )
mKnowledges[i]->erase(aIt);
if( mKnowledges[i]->size() ==1 && mKnowledges[i]->front()->mType == superseed ){
BTSocket* con = getConnection( mKnowledges[i]->front() );
if( !con ->mDownloading)
sendRequestTo( mKnowledges[i]->front() );
}
}
};
bool BTSession::retireWork(void){
if( bt.mTracker->getPeerNumSince(mDocument->mkey, mStartTime) <= ESP_RETIRE_PEER_NUM )
return false;
Topology *topology = Topology::getInstance();
if( topology->getAvailableBandwidth( false, mAgent) > MIN_OUT_BANDWIDTH && mAgent->getActiveSessionNum() < DANGER_INFO_NUM ){
mCouldRetire = true;
return false;
}
printf("%f esp retire server for file %s outputnum %fM\n", Scheduler.getCurrent(), mDocument->mkey.c_str(), mOutPutNum*16/1024.0);
mState = stopped;
mRechoke->cancelRechoke();
bt.mTracker->setEspSate( mDocument->mkey, false);
mCouldRetire = false;
return true;
}
void BTSession:: recoverEsp( void ){
if( mState == working )
return;
mState = working;
mRechoke->nextSchedule();
mStartTime = Scheduler.getCurrent();
bt.mTracker->setEspSate( mDocument->mkey, true);
}
bool BTSession::ifNoInterestedPeer(void){
map<BTPeer*, BTSocket*>::iterator aIt = mConnections.begin();
bool flag = true;
while( aIt != mConnections.end() ){
if( !aIt->second->mConnected)
return false;
if( aIt->second->mInsterested )
flag = false;
aIt++;
}
return flag;
}
void BTSession::requestEsp(void){
BTPeer* peer = bt.mEspList.front();
if( peerInConnections(peer ))
sendRequestTo( peer);
else
connectTo( peer, true);
}
bool BTSession::ifNoAvailablePeer(void){
for( unsigned int i = 0; i< mKnowledges.size(); i++)
if( mKnowledges[i]->size() >1)
return false;
return true;
};
bool BTSession::ifUpload(void){
map<BTPeer*, BTSocket*>::iterator aIt = mConnections.begin();
BTSocket* p = NULL;
while( aIt != mConnections.end() ){
p = (aIt++)->second;
if( p != NULL && p->mOutConnection != NULL && p->mOutConnection->isActive() )
return true;
}
return false;
};
bool BTSession::checkRetire(void){
unsigned int finishednum = mDownloadInfo[1];
for( unsigned int i = 0; i< mDownloadInfo.size(); i++){
if( mDownloadInfo[i] == 0 )
return false;
else if( mDownloadInfo[i] < finishednum )
finishednum = mDownloadInfo[i];
}
unsigned int peerNum = bt.mTracker->getPeerNumSince( mDocument->mkey, mStartTime);
if( (!mLastFinishedNum && finishednum >= MIN_COM_NUM)||( mLastFinishedNum && finishednum - mLastFinishedNum >=1 ) ){
printf("%f esp retire when file finishednum %d peerarrivenum %d\n", Scheduler.getCurrent(), finishednum, peerNum);
if( retireWork() ){
mLastFinishedNum = finishednum;
return true;
}
}
return false;
}
bool lessPeerNum::operator()(pieceInfo const& info1, pieceInfo const& info2) const{
if( info1.peerNum < info2.peerNum)
return true;
else
return false;
};
bool OutlessCompare::operator()(BTSocket* const & peer1, BTSocket* const & peer2) const{
double timestamp = Scheduler.getCurrent() - CAL_BANWIDTH_INTERVAL;
double num1 = peer1->calcOutAverageBandwidthSince( timestamp);
double num2 = peer2->calcOutAverageBandwidthSince(timestamp);
if( num1 > num2)
return false;
else
return true;
};
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -