📄 p2pmgr.cpp
字号:
array.push_back(it->GetPeerInfo());
}
PeerAddrIt it1 = m_peers.begin();
for(; it1 != m_peers.end() && array.size() < wantNum; ++it1) {
if(!IsConnectablePeer(addr, *it1))
continue;
array.push_back(*it1);
}
for(CPeerIt it2 = m_connecting.begin(); it2 != m_connecting.end() && array.size() < wantNum; ++it2) {
if(!IsConnectablePeer(addr, *it2))
continue;
array.push_back(*it2);
}
}
void P2PMgr::SendPackets() {
bool noIncrease = false;
TCPPacket* packet = NULL;
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
// 如果不用继续发送上轮循环连接的数据,则把it降回去
if(noIncrease) {
--it;
noIncrease = false;
}
if(!it->IsValid())
continue;
it->GetPacket(packet);
if(!packet)
continue;
int ret = send(it->GetSocket(), packet->buf+packet->sent, packet->size-packet->sent, 0);
if(SOCKET_ERROR == ret) {
DWORD lastError = ::WSAGetLastError();
if (WSAEWOULDBLOCK == lastError) {
it->PutPacketBack(packet);
}
else {
it->SetInvalid();
ReleasePacket(packet);
comm->logFile.StatusErr("Sending data on TCP", lastError);
}
}
else {
AddOutgoingBytes(ret);
it->AddOutgoingBytes(ret);
assert(ret <= packet->size-packet->sent);
if(ret == packet->size-packet->sent) {
// 已经发送完毕,释放Buffer
if(packet->GetMsgType() == P2P_RESPONSE) {
AddBlockDataUp(*packet->GetBlockSize());
}
ReleasePacket(packet);
// 呵呵,此连接的一个块已经发完了!接着发送下一个
if(!it->SendFirstPushBlock())
assert(0);
noIncrease = true;
}
else { // 尚未发送完毕,下次继续发送
packet->sent += ret;
it->PutPacketBack(packet);
}
}
}
}
UINT8 P2PMgr::GetSelfLayer() {
if(GetCPConCount() > 0)
return 1; // 连接了CP,肯定是第一层
UINT8 layer = 0xff;
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
if(!it->IsValid())
continue;
// 必须和本机是同一个资源才有价值
if(!it->GetIsSameRes())
continue;
if(it->GetLayer() == 0xff) // 尚未收到P2P_HELLO消息
continue;
if(it->GetLayer() <= 1)
assert(0);
layer = min(layer, it->GetLayer());
}
return layer;
}
void P2PMgr::GetSelfInfo(CorePeerInfo& thisPeer) {
thisPeer.isCachePeer = false;
thisPeer.layer = GetSelfLayer();
thisPeer.isMaxIn = (GetIncomingCount() >= comm->cfgData.INCOMING_NUM);
thisPeer.isMaxOut = (GetOutgoingCount() >= comm->cfgData.OUTGOING_NUM);
thisPeer.isMaxFreeIn = (GetFreeInCount() >= comm->cfgData.IN_4FREE_NUM);
thisPeer.isMaxFreeOut = (GetFreeOutCount() >= comm->cfgData.OUT_4FREE_NUM);
}
UINT8 P2PMgr::ExGetCount(bool in, bool out, bool freeIn, bool freeOut, bool cponly, bool includeConnecting) {
UINT8 count = 0;
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
if(!it->IsValid())
continue;
if(cponly) {
count += it->GetIsCachePeer()?1:0;
continue;
}
if(it->GetIsIncoming()) {
if(it->GetIsForFree())
count += freeIn?1:0;
else
count += in?1:0;
}
else {
if(it->GetIsForFree())
count += freeOut?1:0;
else
count += out?1:0;
}
}
if(includeConnecting) {
for(CPeerIt it1 = m_connecting.begin(); it1 != m_connecting.end(); ++it1) {
if(cponly) {
count += it->GetIsCachePeer()?1:0;
continue;
}
if(it1->isIncoming) {
if(it1->isForFree)
count += freeIn?1:0;
else
count += in?1:0;
}
else {
if(it1->isForFree)
count += freeOut?1:0;
else
count += out?1:0;
}
}
}
return count;
};
// 检查连接数目是否已满
bool P2PMgr::IsConnectionFull(bool isIncoming, bool isForFree, bool includeConnecting) {
if(isIncoming) {
if(isForFree)
return GetFreeInCount(includeConnecting) >= comm->cfgData.IN_4FREE_NUM;
return GetIncomingCount(includeConnecting) >= comm->cfgData.INCOMING_NUM;
}
if(isForFree)
return GetFreeOutCount(includeConnecting) >= comm->cfgData.OUT_4FREE_NUM;
return GetOutgoingCount(includeConnecting) >= comm->cfgData.OUTGOING_NUM;
}
UINT16 P2PMgr::GetLowVersionCount() {
if(m_lowversionCount >= 0xffff)
return 0xffff;
return static_cast<UINT16>(m_lowversionCount);
}
UINT16 P2PMgr::GetConnectFailCount() {
if(m_connectFailCount >= 0xffff)
return 0xffff;
return static_cast<UINT16>(m_connectFailCount);
}
UINT16 P2PMgr::GetTotalIncomingCount() {
if(m_incomingCount >= 0xffff)
return 0xffff;
return static_cast<UINT16>(m_incomingCount);
}
UINT16 P2PMgr::GetTotalOutgoingCount() {
if(m_outgoingCount >= 0xffff)
return 0xffff;
return static_cast<UINT16>(m_outgoingCount);
}
UINT16 P2PMgr::GetAvgIncomingTime() {
UINT totalTime = m_incomingElapsedTime;
UINT count = m_incomingCount;
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
if(!it->IsValid())
continue;
if(it->GetIsIncoming()) {
count++;
totalTime += it->GetElapsedTime();
}
}
if(count == 0)
return 0;
UINT avgTime = totalTime/count;
if(avgTime/1000 >= 0xffff)
return 0xffff;
return static_cast<UINT16>(avgTime/1000);
}
UINT16 P2PMgr::GetAvgOutgoingTime() {
UINT totalTime = m_outgoingElapsedTime;
UINT count = m_outgoingCount;
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
if(!it->IsValid())
continue;
if(!it->GetIsIncoming()) {
count++;
totalTime += it->GetElapsedTime();
}
}
if(count == 0)
return 0;
UINT avgTime = totalTime/count;
if(avgTime/1000 >= 0xffff)
return 0xffff;
return static_cast<UINT16>(avgTime/1000);
}
void P2PMgr::ConnectionEstablished(bool isIncoming) {
if(isIncoming)
m_incomingCount++;
else
m_outgoingCount++;
}
void P2PMgr::ConnectionClosed(bool isIncoming, DWORD elapsedTime) {
if(isIncoming) {
m_incomingElapsedTime += elapsedTime;
}
else {
m_outgoingElapsedTime += elapsedTime;
}
}
float P2PMgr::GetMessagePercent() {
// 计算P2P网络中数据长度与控制信息长度的比例, 以不超过5%为佳
TransferInfo tiTemp;
GetTransferInfo(tiTemp);
LONGLONG allMsg = 0;
LONGLONG allData = 0;
allMsg += tiTemp.totalDownBytes + tiTemp.totalUpBytes - m_totalBlockDataUp - m_totalBlockDataDown;
allData += tiTemp.totalDownBytes + tiTemp.totalUpBytes;
double a = (double)allMsg/(1024);
double b = (double)allData/(1024);
double msgPercent = (allData==0?0:(((double)allMsg/allData)*100));
comm->logFile.StatusOut("Msg Percent: %f%(%fKB/%fKB), Share Percent: %f%(%.2f/%.2fMB), Data From CP: %.2fMB", msgPercent, a, b,
(tiTemp.totalUpBytes*100.f)/tiTemp.totalDownBytes,
tiTemp.totalUpBytes/1024.f/1024.f, tiTemp.totalDownBytes/1024.f/1024.f,
m_totalBytesFromCP/1024.f/1024.f);
return static_cast<float>(msgPercent);
}
// 从现有连接(参数client的连接除外)的Push List中回收一个块
// 情况1. 某个连接下载到了一个Block,并且在该连接的Push List中没有找到这个块。
// 那么说明这个块很可能是上次改变Push List之际,对方发送过来的;
void P2PMgr::ReclaimBlocks(P2PClient* client, UINT blockID) {
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
// 不检查无效连接和提交此块的连接,如果client为NULL,则检查所有有效连接
if(!it->IsValid() || &*it == client)
continue;
// 如果找到了,就删除并跳出循环,因为每块都只会被分配一次
if(it->DelPushBlock(blockID))
break;
}
}
// 重新分配所有所有连接的Push List.
// 当新增一条连接时,当一个连接的PushList为空时
// 这个开销(网络流量)其实比较大,还需要商榷,作为暂时的解决办法是没问题的。
void P2PMgr::RedistributeAllBlocks(P2PClient* client) {
assert(comm->currRes);
if(!comm->currRes)
return;
if(client) {
// 如果是普通下载方式,只给此连接分配少量的块。
client->ReloadPushList();
}
else {
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
if(!it->IsValid() || client == &*it)
continue;
it->ReloadPushList();
}
}
}
// 在除了client以外的所有连接的PushList中查找blockID
bool P2PMgr::FindInAllPushList(const UINT blockID, P2PClient* client) {
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
if(!it->IsValid() || client == &*it)
continue;
if(it->FindPushBlock(blockID))
return true;
}
return false;
}
// 在除了client以外的所有连接(是否包含CP)的remoteInterval中查找blockID
UINT8 P2PMgr::FindInAllRemoteInterval(const blockID, P2PClient* client, bool includeCP, float minSpeed) {
UINT8 count = 0;
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
if(!it->IsValid() || client == &*it)
continue;
if(!includeCP && it->GetIsCachePeer())
continue;
if(it->GetCurDownSpeed()/1024 < minSpeed)
continue;
if(it->FindRemoteBlock(blockID))
++count;
}
return count;
}
// 在除了client以外的所有LAN连接的remoteInterval中查找blockID
UINT8 P2PMgr::FindInAllLanRemoteInterval(const blockID, P2PClient* client, float minSpeed) {
UINT8 count = 0;
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
if(!it->IsValid() || client == &*it)
continue;
if(!it->GetIsSameLan())
continue;
if(it->GetCurDownSpeed()/1024 < minSpeed)
continue;
if(it->FindRemoteBlock(blockID))
++count;
}
return count;
}
LPCSTR P2PMgr::FormatIPAddress(P2PAddress& addr) {
strcpy(TempIP, inet_ntoa(addr.outerIP.sin_addr));
if(addr.subnetIP.sin_addr.s_addr != UINT_MAX) {
strcat(TempIP, "->");
strcat(TempIP, inet_ntoa(addr.subnetIP.sin_addr));
}
strcat(TempIP, ":");
sprintf(TempIP+strlen(TempIP), "%d", ntohs(addr.subnetIP.sin_port));
return TempIP;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -