📄 btsocket.cpp
字号:
#include "BTSocket.h"
#include "BTEvent.h"
#include "BTSession.h"
#include "BTConnection.h"
#include "BTDocument.h"
#include "BTPeer.h"
#include "HandleError.h"
#include "SimEventScheduler.h"
#include "BT.h"
#include "Sysparameter.h"
#include "Debug.h"
BTSocket::BTSocket(BTSession* s){
//mBitfield.clear();
mCounterPart = NULL;
mSession = s;
mEstimateFinish = mRequestTimeout = mConnectionTimeout = NULL;
mPeerInsterested = mInsterested =mUploading = mDownloading =mClosed =false;
mPeerChoke = mChoke = true;
mBegin = mLength = mDownloadLeft = 0;
mRequestingPiece = MAX_UNSIGNED_VALUE;
mStartTime = 0;
mConnected = false;
mInConnection = NULL;
mOutConnection = NULL;
mAvrDownload = 0;
mAvrUpload = 0;
};
BTSocket::~BTSocket(void){
if( mEstimateFinish != NULL){
Scheduler.cancel( mEstimateFinish);
delete mEstimateFinish;
mEstimateFinish = NULL;
}
cancelRequestTimeout();
delete mRequestTimeout;
mRequestTimeout = NULL;
cancelConnectionTimeout();
delete mConnectionTimeout;
mConnectionTimeout = NULL;
if( mRequestingPiece < MAX_UNSIGNED_VALUE ){
if( mSession->mDocument->inDownLoadingSet( mRequestingPiece))
mSession->mDocument->removePiecedownloading( mRequestingPiece);
if( mSession->mDocument->inPartialList(mRequestingPiece) && mSession->mDocument->getPieceProgress( mRequestingPiece) == 0)
mSession->mDocument->removePieceProgress( mRequestingPiece);
mRequestingPiece = MAX_UNSIGNED_VALUE;
}
//should handle ending the connection
/*
delete mConnection;
mCounterPart->mConnection = NULL;
*/
}
void BTSocket::handler(SimEvent* event){
if( mClosed ||!mConnected)
return;
switch(event->getType()){
case CONNECTION_BLOCK_DOWNLOAD_FININSHE:
handleBlockFininshed((BTEvent *)event);
return;
case REQUEST_TIMEOUT:
handleRequestTimeout((BTEvent*)event);
return;
default:
handleError("BTSocket::Handler:unknown event type");
return;
}
}
void BTSocket::handleBlockFininshed(BTEvent * event){
double* Rate = (double*)(event->getAddParam());
delete Rate;
BTDocument* btd = mSession->getDocument();
BTSession* peerSession = mCounterPart->mSession;
BTPeer* peer = peerSession->getAgent();
mSession->setAmountBetweenSnubDect( mLength + mSession->getAmountBetweenSnubDect());
if( mRequestingPiece == MAX_UNSIGNED_VALUE ){
handleError( "BTSocket::handleBlockFininshed:wrong piece index");
delete event;
return;
}
DEBUG(SOCKETDEBUGLEVEL)
printf("%f peer %d rev from peer %d index %u begin %d length %d\n", Scheduler.getCurrent(), mSession->getAgent()->getID(), mCounterPart->mSession->getAgent()->getID(), mRequestingPiece, mBegin, mLength);
if( !btd->havePiece( mRequestingPiece) ){
double piecepercent = 1.0*( mBegin + mLength) / btd->getPieceLength(mRequestingPiece);
double totalpercent = btd->getFinishedPercent();
if( piecepercent < 1.0)
btd->addPieceProgress( mRequestingPiece, mLength);
else{
btd->finishPiece( mRequestingPiece);
//if the upload peer is superseed then download piece info will be add
if( mCounterPart->mSession->mAgent->mType == superseed )
mCounterPart->mSession->addPieceDownload( mRequestingPiece);
DEBUG(SOCKETDEBUGLEVEL)
printf("peer %d fininsh piece %u, left piecenum %u\n", mSession->getAgent()->getID(), mRequestingPiece, btd->getLeftPieceNum());
totalpercent = btd->getFinishedPercent();
//should add debug info for download process
if( btd->isWhole()){
mSession->sendInterested(false, this);
mSession->mState = completed;
mSession->announceTracker();
//finish whole file
//debug
}
mSession->announcePeers( mRequestingPiece);
mRequestingPiece = MAX_UNSIGNED_VALUE;
}
}
else{
btd->removePiecedownloading( mRequestingPiece);
mRequestingPiece = MAX_UNSIGNED_VALUE;
}
mDownloading =false;
mCounterPart->mUploading =false;
mBegin = mLength =mDownloadLeft = 0;
mEstimateFinish = NULL;
mStartTime = -1;
mDownloadLeft = 0;
if( mRequestingPiece < MAX_UNSIGNED_VALUE ){
btd->removePiecedownloading( mRequestingPiece);
mRequestingPiece = MAX_UNSIGNED_VALUE;
}
if( btd->isWhole()){
mInConnection->inactivate();
adjustInBandwidth();
mCounterPart->adjustOutBandwidth();
BTEvent* bte = new BTEvent(PEER_TIMEOUT, Scheduler.getCurrent() + PEER_RETIRE_TIME, mSession, this);
Scheduler.enqueue(bte);
return;
}
if( !mPeerChoke && !btd->isWhole())
mSession->sendRequestTo( peer);
else{
mInConnection->inactivate();
adjustInBandwidth();
mCounterPart->adjustOutBandwidth();
}
delete event;
}
void BTSocket::handleRequestTimeout(BTEvent* event){
DEBUG(SOCKETDEBUGLEVEL)
printf("%f peer %d rev requesttimeout\n", Scheduler.getCurrent(), mSession->getAgent()->getID());
mBegin = 0;
mLength = 0;
mDownloadLeft = 0;
mDownloading = false;
mCounterPart->mUploading =false;
mStartTime = -1;
mEstimateFinish = NULL;
mSession->mDocument->removePiecedownloading( mRequestingPiece);
if( mSession->mDocument->getPieceProgress( mRequestingPiece) == 0)
mSession->mDocument->removePieceProgress( mRequestingPiece);
mRequestingPiece = MAX_UNSIGNED_VALUE;
if( !mPeerChoke)
mSession->sendRequestTo(mCounterPart->mSession->getAgent());
}
void BTSocket::adjustInBandwidth(void){
// if( !mDownloading || mEstimateFinish == NULL)
// return;
if( !mDownloading )
return;
if( mRequestingPiece == MAX_UNSIGNED_VALUE|| mSession->getDocument()->havePiece( mRequestingPiece) )
return;
if( mEstimateFinish != NULL){
BandType* preRate =(BandType*)(mEstimateFinish->getAddParam());
if( *preRate == mInConnection->getBandwidth())
return;
unsigned int addDown = (int)( ( Scheduler.getCurrent() - mStartTime) * (*preRate));
if( addDown > mDownloadLeft){
printf("BTSocket::adjustInBandwidth: downloadleft %d < addDown %d\n", mDownloadLeft, addDown);
mDownloadLeft = 0;
// exit( -1);
}
else
mDownloadLeft -= addDown;
Scheduler.cancel(mEstimateFinish);
BandType * dRate = new BandType( mInConnection->getBandwidth());
double finish = Scheduler.getCurrent() + ( (double)mDownloadLeft/ *dRate);
if( finish < Scheduler.getCurrent()){
printf("BTScocket::adjustInBandwidth finishtime %f download left %d rate %ld\n", finish, mDownloadLeft, *dRate);
exit(-1);
}
mStartTime = Scheduler.getCurrent();
mEstimateFinish->setTimeStamp(finish);
mEstimateFinish->setAddParam( dRate);
Scheduler.enqueue(mEstimateFinish);
DEBUG(SOCKETDEBUGLEVEL)
printf("%f peer %d adjust the connection (to peer %d) bandwidth before rate %ld after rate %ld\n", Scheduler.getCurrent(), mSession->getAgent()->getID(), mCounterPart->mSession->getAgent()->getID(), *preRate, *dRate);
delete preRate;
}
BandwidthHistory* newBandHistory = new BandwidthHistory;
newBandHistory->mTimeStamp = mStartTime;
newBandHistory->mBandwidth = mInConnection->getBandwidth();
mInBandwidthHistory.push_back(newBandHistory);
};
void BTSocket::adjustOutBandwidth(void){
if( !mUploading)
return;
if( mOutConnection == NULL)
return;
BandwidthHistory* newBandHistory = new BandwidthHistory;
newBandHistory->mTimeStamp = Scheduler.getCurrent();
newBandHistory->mBandwidth = mOutConnection->getBandwidth();
mOutBandwidthHistory.push_back(newBandHistory);
};
/*
double BTSocket::calcInAverageBandwidthSince(double timeStamp){
if( !mConnected ||mClosed)
return 0;
if( timeStamp < 0)
timeStamp = 0;
list<BandwidthHistory*>::iterator aIt , pIt;
aIt = mInBandwidthHistory.begin();
BandwidthHistory* pbd = NULL;
double lastBandwidth = 0;
while( aIt != mInBandwidthHistory.end()){
pIt = aIt++;
pbd = *pIt;
if( pbd->mTimeStamp < timeStamp ){
lastBandwidth = pbd->mBandwidth;
if( aIt != mInBandwidthHistory.end() ){
delete pbd;
mInBandwidthHistory.erase( pIt );
// delete pbd;
}
else
break;
}
}
double sum = 0;
double time = timeStamp;
aIt = mInBandwidthHistory.begin();
if(aIt != mInBandwidthHistory.end() && (*aIt)->mTimeStamp < time)
(*aIt)->mTimeStamp = time;
while( aIt != mInBandwidthHistory.end()){
pbd = *( aIt ++);
sum += ( pbd->mTimeStamp - time)*lastBandwidth;
time = pbd->mTimeStamp;
lastBandwidth = pbd->mBandwidth;
}
sum += ( Scheduler.getCurrent() - time)*lastBandwidth;
return (double)sum/(double)(Scheduler.getCurrent() - timeStamp);
};
double BTSocket::calcOutAverageBandwidthSince(double timeStamp){
if( !mConnected ||mClosed)
return 0;
if( timeStamp < 0)
timeStamp = 0;
list<BandwidthHistory*>::iterator aIt, pIt;
aIt = mOutBandwidthHistory.begin();
BandwidthHistory* pbd = NULL;
double lastBandwidth = 0;
while( aIt != mOutBandwidthHistory.end()){
pIt = aIt++;
pbd = *pIt;
if( pbd->mTimeStamp < timeStamp ){
lastBandwidth = pbd->mBandwidth;
if( aIt != mOutBandwidthHistory.end() ){
delete pbd;
mOutBandwidthHistory.erase( pIt );
// delete pbd;
}
else
break;
}
}
double sum = 0;
double time = timeStamp;
aIt = mOutBandwidthHistory.begin();
if( aIt != mOutBandwidthHistory.end() && (*aIt)->mTimeStamp < time)
(*aIt)->mTimeStamp = time;
while( aIt != mOutBandwidthHistory.end()){
pbd = *( aIt ++);
sum += ( pbd->mTimeStamp - time)*lastBandwidth;
time = pbd->mTimeStamp;
lastBandwidth = pbd->mBandwidth;
}
sum += ( Scheduler.getCurrent() - time)*lastBandwidth;
return (double)sum/(double)( Scheduler.getCurrent() - timeStamp);
};
*/
double BTSocket::calcInAverageBandwidthSince(double timeStamp){
if( !mConnected ||mClosed)
return 0;
if( timeStamp < 0)
timeStamp = 0;
BandwidthHistory* pbd = NULL;
double lastBandwidth = 0;
double sum = 0;
double time = timeStamp;
list<BandwidthHistory*>::iterator aIt = mInBandwidthHistory.begin();
while( aIt != mInBandwidthHistory.end()){
pbd = *( aIt ++);
if( pbd->mTimeStamp < timeStamp ){
lastBandwidth = pbd->mBandwidth;
if( aIt != mInBandwidthHistory.end())
continue;
else
break;
}
sum += ( pbd->mTimeStamp - time)*lastBandwidth;
time = pbd->mTimeStamp;
lastBandwidth = pbd->mBandwidth;
}
sum += ( Scheduler.getCurrent() - time)*lastBandwidth;
mAvrDownload = (double)sum/(double)(Scheduler.getCurrent() - timeStamp);
return mAvrDownload;
};
double BTSocket::calcOutAverageBandwidthSince(double timeStamp){
if( !mConnected ||mClosed)
return 0;
if( timeStamp < 0)
timeStamp = 0;
list<BandwidthHistory*>::iterator aIt = mOutBandwidthHistory.begin();
BandwidthHistory* pbd = NULL;
double lastBandwidth = 0;
double sum = 0;
double time = timeStamp;
while( aIt != mOutBandwidthHistory.end()){
pbd = *( aIt ++);
if( pbd->mTimeStamp < time ){
lastBandwidth = pbd->mBandwidth;
if( aIt == mOutBandwidthHistory.end())
break;
else
continue;
}
sum += ( pbd->mTimeStamp - time)*lastBandwidth;
time = pbd->mTimeStamp;
lastBandwidth = pbd->mBandwidth;
}
sum += ( Scheduler.getCurrent() - time)*lastBandwidth;
mAvrUpload = (double)sum/(double)( Scheduler.getCurrent() - timeStamp);
return mAvrUpload;
};
void BTSocket::sendConnectionTimeout(void){
if( mConnectionTimeout != NULL){
changeConnectionTimeout();
return;
}
double t = Scheduler.getCurrent()+ BT_CONNECTION_TIMEOUT;
BTEvent* bte = new BTEvent( CONNECTION_TIMEOUT, t, mSession, this);
mConnectionTimeout = bte;
DEBUG(SOCKETDEBUGLEVEL)
printf("%f peer %d send connectiontimeout \n", Scheduler.getCurrent(), mSession->getAgent()->getID());
mSession->mConnectTimoutList.push_back( this);
}
void BTSocket::cancelConnectionTimeout(void){
if( mConnectionTimeout != NULL)
mSession->mConnectTimoutList.remove( this );
}
void BTSocket::changeConnectionTimeout(void){
if( mConnectionTimeout!= NULL){
mConnectionTimeout->setTimeStamp( Scheduler.getCurrent() +BT_CONNECTION_TIMEOUT);
mSession->mConnectTimoutList.remove(this);
mSession->mConnectTimoutList.push_back(this);
}
else
sendConnectionTimeout();
}
void BTSocket::sendRequestTimeout(void){
if( mRequestTimeout != NULL){
mRequestTimeout->setTimeStamp( Scheduler.getCurrent() +BTREQUESTTIMEOUT);
mSession->mRequestTimoutList.push_back(this);
return;
}
double t = Scheduler.getCurrent()+ BTREQUESTTIMEOUT;
BTEvent* bte = new BTEvent( REQUEST_TIMEOUT, t, mSession, this);
mRequestTimeout = bte;
DEBUG(SOCKETDEBUGLEVEL)
printf("%f peer %d send Requesttimeout \n", Scheduler.getCurrent(), mSession->getAgent()->getID());
mSession->mRequestTimoutList.push_back( this);
}
void BTSocket::cancelRequestTimeout(void){
if( mRequestTimeout != NULL)
mSession->mRequestTimoutList.remove( this );
}
void BTSocket::changeRequestTimeout(void){
if( mRequestTimeout!= NULL){
mRequestTimeout->setTimeStamp( Scheduler.getCurrent() +BTREQUESTTIMEOUT);
mSession->mRequestTimoutList.remove(this);
mSession->mRequestTimoutList.push_back(this);
}
else
sendRequestTimeout();
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -