📄 btpeer.cpp
字号:
#include <iostream>
#include <stdio.h>
#include "BTPeer.h"
#include "BTEvent.h"
#include "BTTracker.h"
#include "BTSession.h"
#include "BTPeerMessage.h"
#include "BTDocument.h"
#include "BT.h"
#include "BTTorrent.h"
#include "HandleError.h"
#include "SimEventScheduler.h"
#include "BTSocket.h"
#include "Sysparameter.h"
#include "Cache.h"
#include "Debug.h"
#include "BTAlgorithmRechoke.h"
#include "Topology.h"
BTPeer::BTPeer(peerType type){
MaxConnectionNum = MAX_CONNECTION_NUM;
mSessions.clear();
mDocuments.clear();
mType = type;
if( mType == superseed){
mCache = new Cache;
mCache->setAgent(this);
}
else
mCache = NULL;
mState = retire;
mStartTime = 0;
};
BTPeer::~BTPeer(void){
delete mCache;
};
BTSession* BTPeer::getSession(string key){
map<string, BTSession*>::iterator aIt = mSessions.find(key);
if( aIt != mSessions.end())
return aIt->second;
else
return NULL;
};
BTSession* BTPeer::addSession(string key){
BTSession* p = new BTSession(key, this);
mSessions.insert(map<string, BTSession*>::value_type(key, p));
Topology* topology = Topology::getInstance();
if( mType == superseed && topology->getAvailableBandwidth( false, this) < MIN_OUT_BANDWIDTH &&getActiveSessionNum() > DANGER_INFO_NUM ){
map<string, BTSession*>::iterator aIt = mSessions.begin();
while( aIt != mSessions.end() ){
BTSession* pSession = (aIt++)->second;
if( pSession->mState != stopped && pSession->mCouldRetire && pSession->countAvlFileNum() >= MIN_COM_NUM)
pSession->retireWork();
}
}
return p;
};
BTDocument* BTPeer::getDocument(string key){
map<string, BTDocument*>::iterator aIt = mDocuments.find(key);
if( aIt != mDocuments.end())
return aIt->second;
else
return NULL;
};
BTDocument* BTPeer::addDocument(string key, int size, int popularity){
BTDocument* p = new BTDocument(key, size, popularity);
mDocuments.insert(map<string, BTDocument*>::value_type(key, p));
return p;
}
void BTPeer::ownDocument(BTDocument* doc){
mDocuments.insert(map<string, BTDocument*>::value_type(doc->mkey, doc));
};
void BTPeer::publish(string key, BTTracker* tracker){
BTTorrent* tt = new BTTorrent(getDocument(key), tracker);
tracker->addTorrent( tt);
}
void BTPeer::handler(SimEvent* event){
switch(event->getType()){
case RANDOM_DOWNLOAD_FILE:
handleRanDomDownloadFile((BTEvent*)event);
return;
case PUBLISH_FILE:
handlePublishFile((BTEvent*)event);
return;
case PEER_MESSAGE:
handlePeerMessage((BTEvent*)event);
return;
case ESP_RECOVER:
handleESPRecover( (BTEvent*)event );
return;
case ESP_RETIRE:
handleESPRetire( (BTEvent*)event );
return;
default:
handleError("unknown simevent type");
return ;
}
}
void BTPeer::handleRanDomDownloadFile(BTEvent* event){
int* serverId = (int* )(event->getParam());
BTTracker* ptr=(BTTracker*) bt.getAgent(*serverId);
delete serverId;
if( ptr == NULL)
handleFatalError("BTPeer::handleRandomDownloadFile:no tracker addr");
BTTorrent* tt = ptr->getRandomTorrent();
if( tt == NULL){
printf("handleRanDomDownloadFile: no torrent\n");
exit(-1);
}
BTSession* ts = getSession(tt->mKey);
if( ts != NULL)
ts->SetTorrent( tt );
else{
addDocument(tt->mKey, tt->mSize, tt->mPopularity);
ts = addSession(tt->mKey);
ts->SetTorrent( tt);
}
mStartTime = Scheduler.getCurrent();
ts->queryTracker();
delete event;
mState = work;
DEBUG(SIMDEBUGLEVEL)
cout<<Scheduler.getCurrent()<<" peer "<< getID()<<" begin to down load file"<<tt->mKey<<"file size"<<tt->mSize<<endl;
};
void BTPeer::handlePublishFile(BTEvent* event){
int tackerId = *((int*) (event->getParam()));
string key = *((string*) (event->getAddParam()));
publish(key, (BTTracker*)(bt.getAgent(tackerId)));
delete event;
};
void BTPeer::handlePeerMessage(BTEvent* event){
if( mState == retire){
delete event;
return;
}
BTPeerMessage* peerMessage = (BTPeerMessage*)(event->getAddParam());
if( peerMessage->mType != HAND_SHAKE)
handleFatalError("BTPeer::handlePeerMessage:wrong peermessage type");
handleHandShake(event);
};
void BTPeer::handleHandShake(BTEvent* event){
BTSession* ps = (BTSession*)(event->getParam());
BTPeer* peer = ps->getAgent();
BTPeerMessage* pm = (BTPeerMessage*)(event->getAddParam());
BTDocument* mpd = getDocument(pm->mDownloadHash);
if( mpd == NULL)
handleFatalError("BTPeer::handlehandShake:no file in peer");
BTSession* mps = getSession(pm->mDownloadHash);
if( mps == NULL)
mps = addSession(pm->mDownloadHash);
bool interested = false;
if( !mpd->isWhole()){
vector<bool>* pieces = mpd->getPieces();
vector<bool>* peerPieces = pm->mBitfield;
unsigned int i = 0;
for( ; i< peerPieces->size(); i++){
if( (*peerPieces)[i]){
if( !(*pieces)[i]){
interested = true;
mps->addKnowledgePiece(i, peer);
}
}
}
}
//set up connection
BTSocket* connection = NULL;
if( !mps->peerInConnections( peer) ){
connection = mps->addConnection( peer);
if( connection == NULL){
delete event;
delete pm;
return;
}
}
else
connection = mps->getConnection( peer);
connection->mInsterested = interested;
double t = Scheduler.getCurrent() + bt.getAgentDelay( getID(), ps->getAgent()->getID());
if( t < Scheduler.getCurrent()){
printf("BTPeer::HandleShakeHand t %f < current agentDelay %f\n", t, bt.getAgentDelay( getID(), ps->getAgent()->getID()));
exit(-1);
}
BTPeerMessage* mpm = new BTPeerMessage(BITFIELD);
mpm->mDownloadHash = mpd->mkey;
mpm->mBitfield = mpd->getPieces();
mpm->mConnection = connection;
mpm->mInsterested = interested;
BTEvent* bte = new BTEvent(PEER_MESSAGE, t, ps, mps);
bte->setAddParam(mpm);
Scheduler.enqueue(bte);
connection->sendConnectionTimeout();
DEBUG(PEERDEBUGLEVEL)
printf("%f peer %d rev handshake from peer %d \n", Scheduler.getCurrent(), getID(), peer->getID());
DEBUG(PEERDEBUGLEVEL)
printf("%f peer %d send bitfield to peer %d\n", Scheduler.getCurrent(), getID(), peer->getID());
delete event;
delete pm;
// bte->dispatch();
};
void BTPeer::removeSession(BTSession* session){
map<string, BTSession*>::iterator aIt = mSessions.find( session->mDocument->mkey);
if( aIt != mSessions.end())
mSessions.erase( aIt);
DEBUG(PEERDEBUGLEVEL)
printf("peer %d remove session\n", getID() );
if( mType == superseed && mSessions.size() == 0)
printf("%f peer %d cachehit %f cachesearchnum %d hitnum %d cahelist size %d\n", Scheduler.getCurrent(), getID(), mCache->getCacheHitRatio(), mCache->mCacheSearchNum, mCache->mCacheHitNum, mCache->mCacheList->size());
Topology* topology = Topology::getInstance();
if( mType == superseed && topology->getAvailableBandwidth( false, this) >MIN_OUT_BANDWIDTH&& getActiveSessionNum() < DANGER_INFO_NUM ){
map<string, BTSession*>::iterator aIt = mSessions.begin();
while( aIt != mSessions.end() ){
BTSession* pSession = (aIt++)->second;
if( pSession->mState == stopped )
pSession->recoverEsp();
}
}
};
void BTPeer::printDownloadInfo(void){
map<string, BTSession*>::iterator aIt = mSessions.begin();
while( aIt != mSessions.end()){
printf("file %s donwload info:\n", aIt->first.c_str());
aIt->second->printPieceDownloadInfo();
aIt++;
}
};
void BTPeer::handleESPRecover( BTEvent* event ){
string* key = (string*)event->getParam();
BTSession* session = getSession( *key );
session->recoverEsp();
delete key;
delete event;
};
void BTPeer::handleESPRetire( BTEvent* event ){
string* key = (string*)event->getParam();
BTSession* session = getSession( *key );
session->retireWork();
delete key;
delete event;
};
unsigned int BTPeer::getActiveSessionNum(void){
map<string, BTSession*>::iterator aIt = mSessions.begin();
unsigned int num = 0;
while( aIt != mSessions.end() ){
if( aIt->second->mActive&& aIt->second->mState != stopped)
num++;
aIt++;
}
return num;
};
bool BTPeer::ifUpload(void){
map<string, BTSession*>::iterator aIt = mSessions.begin();
BTSession* p = NULL;
while( aIt != mSessions.end() ){
p = (aIt++)->second;
if( p->ifUpload())
return true;
}
return false;
};
unsigned int BTPeer::getAvalConNum( void ){
unsigned int num = getActiveSessionNum();
unsigned int inient ,ent ;
inient= ent= 200 /num;
unsigned int pool = 0;
BTSession* session = NULL;
map<string, BTSession*>::iterator aIt = mSessions.begin();
do{
while( aIt != mSessions.end() ){
session = (aIt++)->second;
if( session->mState != stopped ){
unsigned activeconnum = session->getActiveCon();
if( activeconnum < ent ){
pool += ent -activeconnum;
num --;
}
}
}
aIt = mSessions.begin();
if( num > 0){
unsigned int xent = pool/num;
if( xent == 0)
break;
while(aIt != mSessions.end()){
session = (aIt++)->second;
if( session->mState != stopped && session->getActiveCon() > ent)
pool -= xent;
}
ent += xent;
}
aIt = mSessions.begin();
}while( pool > 0 && num > 0);
return ent;
}
unsigned int BTPeer::getSessionActConNum( string filename){
map<string, BTSession*>::iterator aIt = mSessions.find( filename );
if( aIt != mSessions.end() )
return aIt->second->getActiveCon();
else
return 0;
};
string BTPeer:: toString(void){
char buf[20] = {0};
sprintf(buf, "BTPEER:%d\n", getID());
return buf;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -