📄 p2pmgr.cpp
字号:
if(!bFirstTime)
assert(!nullLastTime);
bFirstTime = false;
nullLastTime = false;
// 打印当前资源的状态
mgr->comm->currRes->PrintStatus();
}
else
nullLastTime = true;
}
if(currTime-lastCSReportTime >= CS_REPORT) {
csReportCount++;
// CSClient发送
lastCSReportTime = currTime;
mgr->comm->csClient.SendLogin();
mgr->comm->csClient.SendReport((csReportCount%30==0));
// 清空增量列表
if(mgr->comm->currRes)
mgr->comm->currRes->ClearDiffIntervals(true);
}
if(currTime-lastCSReport2Time >= CS_REPORT2) {
// CSClient发送
lastCSReport2Time = currTime;
mgr->comm->csClient.SendReport2();
mgr->comm->csClient.SendNeedPeers();
}
if(currTime-lastClearBadAddress >= P2P_CLEAR_BADADDR) {
lastClearBadAddress = currTime;
// 清空无法连接的地址列表
mgr->m_badAddr.clear();
}
if(currTime-lastP2PReportTime >= P2P_SEND_REPORT) {
lastP2PReportTime = currTime;
p2pReportCount++;
CorePeerInfo thisPeer;
mgr->GetSelfInfo(thisPeer);
// 如果发现本次的SelfPeerInfo于上次的相同,或者本Peer拥有的块发生变化,则广播
if(memcmp(&thisPeer, &mgr->lastSent, sizeof(thisPeer)) != 0 || mgr->m_bBlockIntervalHasChanged) {
for(it = mgr->m_clients.begin(); it != mgr->m_clients.end(); ++it) {
if(it->IsValid())
it->SendReport(thisPeer, (p2pReportCount%20==0));
}
// 记录本次广播的自身信息
memcpy(&mgr->lastSent, &thisPeer, sizeof(thisPeer));
}
// 清空增量列表
if(mgr->comm->currRes)
mgr->comm->currRes->ClearDiffIntervals(false);
}
if(currTime-lastP2PNearPeerTime >= P2P_NEAR_PEERS) {
lastP2PNearPeerTime = currTime;
for(it = mgr->m_clients.begin(); it != mgr->m_clients.end(); ++it) {
if(it->IsValid())
it->SendNearPeers();
}
}
if(mgr->comm->currRes && currTime-lastCheckPushListTime >= P2P_CHECK_PUSHLIST) {
lastCheckPushListTime = currTime;
for(it = mgr->m_clients.begin(); it != mgr->m_clients.end(); ++it) {
if(it->IsValid()) {
it->ReloadPushList();
}
}
}
if (mgr->comm->localPeerFinder.IsRunning() && currTime - lastBroadcastInfoTime >= LPF_BROADCAST) {
lastBroadcastInfoTime = currTime;
mgr->comm->localPeerFinder.SendBroadcast();
}
if(!mgr->comm->currRes) {
mgr->StopAll();
}
}
}
void P2PMgr::SafeCloseSocket(SOCKET sock) {
SOCKET* nSock = new SOCKET;
HANDLE closeThead = NULL;
if(nSock) {
*nSock = sock;
DWORD threadID;
closeThead = CreateThread(
NULL, 0, (LPTHREAD_START_ROUTINE)(P2PMgr::RunCloseSocket), nSock, 0, &threadID);
}
if(closeThead == NULL || nSock == NULL) {
// 如果不能起一个线程,就只好慢慢Close
assert(0);
TE_CloseSocket(sock, FALSE);
}
else if(closeThead){
// 设置较低的优先级
SetThreadPriority(closeThead, THREAD_PRIORITY_BELOW_NORMAL);
// 这个线程启动之后我们将不再关心它的死活:)
CloseHandle(closeThead);
}
}
// 安全关闭SOCKET的线程
void __stdcall P2PMgr::RunCloseSocket(SOCKET* sock) {
if(sock) {
// 因为TE_CloseSocket很慢,所以需要起一个线程
TE_CloseSocket(*sock, FALSE);
delete sock;
}
}
void P2PMgr::StopAll() {
// 断开所有连接
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
if(!it->IsValid())
continue;
it->SetInvalid();
}
// 清空正在连接的列表
for(CPeerIt itt = m_connecting.begin(); itt != m_connecting.end(); ++itt)
SafeCloseSocket(itt->sock);
m_connecting.clear();
// 清空已知Peer的列表
m_peers.clear();
// 清空无法连接的地址列表
m_badAddr.clear();
// 清空需要连接的列表
m_connectto.clear();
}
// 清除所有连接发送媒体类型区间的记录
void P2PMgr::ClearSentMediaArray() {
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
if(!it->IsValid())
continue;
it->ClearSentMediaArray();
}
}
void P2PMgr::AddConnecting(const ConnectingPeer& peer) {
// 正在连接中
m_connecting.push_back(peer);
}
void P2PMgr::AddP2PClient(const ConnectingPeer& peer) {
// 已经连上, 检查此种类型连接是否已满
bool bConnectionFull = IsConnectionFull(peer.isIncoming, peer.isForFree, false);
if(bConnectionFull) {
comm->logFile.StatusOut("connection is full, abandon connection.");
SafeCloseSocket(peer.sock); // no force disconnect
return;
}
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
if(!it->IsValid()) {
if(it->SetValid(comm, peer))
break;
else
it->SetInvalid();
}
}
if(it == m_clients.end()) {
assert(0);
comm->logFile.StatusOut("Shouldn't be here!");
SafeCloseSocket(peer.sock); // no force disconnect
}
}
void P2PMgr::AddConnectTo(const P2PAddress& addr, bool connectForFree) {
if(addr == comm->localAddress || addr.outerIP.sin_addr.s_addr == 0)
return;
ConnectingPeer peer;
memcpy((P2PAddress*)&peer, &addr, sizeof(P2PAddress));
peer.isForFree = connectForFree;
peer.isCachePeer = false;
PeerAddrIt it = find(m_peers.begin(), m_peers.end(), peer);
if(it != m_peers.end()) // found in known list
return;
list<P2PAddress>::const_iterator it1;
it1 = find(m_badAddr.begin(), m_badAddr.end(), addr);
if(it1 != m_badAddr.end()) // found in bad address list
return;
list<ConnectingPeer>::const_iterator it2;
it2 = find(m_connectto.begin(), m_connectto.end(), peer);
if(it2 != m_connectto.end()) // found in connectto list
return;
m_connectto.push_back(peer);
}
void P2PMgr::AddPeerInfo(PeerInfoWithAddr& peer) {
if(peer == comm->localAddress || peer.outerIP.sin_addr.s_addr == 0)
return;
PeerAddrIt it = find(m_peers.begin(), m_peers.end(), peer);
if(it != m_peers.end()) { // found in known list
memcpy(&*it, &peer, sizeof(PeerInfoWithAddr)); // replace old
return;
}
list<P2PAddress>::const_iterator it1;
it1 = find(m_badAddr.begin(), m_badAddr.end(), peer);
if(it1 != m_badAddr.end()) // found in bad address list
return;
list<ConnectingPeer>::const_iterator it2;
for(it2 = m_connectto.begin(); it2 != m_connectto.end(); ++it2) {
if((PeerInfoWithAddr)*it2 == peer) // found in connectto list
return;
}
m_peers.push_back(peer);
}
BOOL P2PMgr::GetPeer4Connect(ConnectingPeer& peer) {
// 当前资源尚未下载完毕
if(comm->currRes) {
// 1. 选择一个地址主动连接
for(PeerAddrIt it = m_peers.begin(); it != m_peers.end(); ++it) {
// 判断出度是否已满
if(!IsConnectionFull(false, false, true)) {
// 复制Peer地址
memcpy((PeerInfoWithAddr*)&peer, &*it, sizeof(PeerInfoWithAddr));
m_peers.erase(it);
// 一个地址只能有一个非Free的连接和一个Free的连接(CP的不定)
if(CheckN4One(peer, false, false, peer.isCachePeer?MAX_CONNECTION_PER_CP:MAX_CONNECTION_PER_NP)) {
peer.isForFree = peer.isIncoming = peer.isPassive = false;
return TRUE;
}
else {
it = m_peers.begin();
continue;
}
}
}
}
// 2. 如果没有主动去连接的Peer,则连接需要被动去连接的Peer
for(CPeerIt it = m_connectto.begin(); it != m_connectto.end(); ++it) {
// 判断连接是否已满
if(!IsConnectionFull(true, it->isForFree, true)) {
// 复制Peer地址
memcpy((ConnectingPeer*)&peer, &*it, sizeof(ConnectingPeer));
m_connectto.erase(it);
// 判断此地址是否已经有连接
if(CheckN4One(peer, peer.isForFree, false, MAX_CONNECTION_PER_NP)) {
peer.isIncoming = true; // 算作连入连接,因为是对方需要己方的数据
peer.isCachePeer = false; // 非CP
peer.isPassive = true; // 被动连接
peer.layer = 0xff; // 对方的layer未知
peer.isMaxIn = peer.isMaxOut = peer.isMaxFreeIn = peer.isMaxFreeOut = false;
return TRUE;
}
else {
it = m_connectto.begin();
continue;
}
}
}
// 没有任何可以连接的地址
return FALSE;
}
void P2PMgr::BroadCastSPUpdate(SPUpdate& spUpdate, BYTE sum) {
UINT8 selfLayer = GetSelfLayer();
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
if(it->IsValid())
it->SendSPUpdate(spUpdate, selfLayer, sum);
}
}
void P2PMgr::AddBadAddr(P2PAddress addr) {
list<P2PAddress>::const_iterator cit =
find(m_badAddr.begin(), m_badAddr.end(), addr);
if(cit == m_badAddr.end()) {
// 如果BadAddr的列表持续增加,在特殊情况下可能会占用很多内存
// 所以保持在少于100个比较好
if(m_badAddr.size() > 100)
m_badAddr.pop_front();
m_badAddr.push_back(addr);
}
}
BOOL P2PMgr::CheckN4One(P2PAddress addr, bool isForFree, bool isConnected, UINT max) {
// 连接计数
UINT count = 0;
// 如果已经连上,要先加上此连接自身
if(isConnected)
++count;
// 查找来自同一地址的连接, 如果超过max值,就返回false
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
if(!it->IsValid())
continue;
if(it->GetAddress() == addr) {
if(isForFree == it->GetIsForFree()) {
count++;
if(count == max)
return FALSE;
}
}
}
// 如果是已经连上的连接, 显然优先级比正在连接的高, 所以不予计算
if(!isConnected) {
// 在正在连接的地址列表中查找同一地址,如果超过max值,就返回false
for(CPeerIt itt = m_connecting.begin(); itt != m_connecting.end(); ++itt) {
ConnectingPeer* peer = &*itt;
if(*peer == addr) {
if(isForFree == itt->isForFree) {
count++;
if(count == max)
return FALSE;
}
}
}
}
return TRUE;
}
bool P2PMgr::IsConnectablePeer(const P2PAddress& connectAddr, const P2PAddress& targetAddr) {
if(connectAddr == targetAddr) // 不用告诉别人他自己的地址
return false;
// 如果外网地址不同,那么...
if(connectAddr.outerIP.sin_addr.s_addr != targetAddr.outerIP.sin_addr.s_addr) {
if(!connectAddr.IsNAT() && targetAddr.IsNAT()) // 外网无法连接内网
return false;
if(connectAddr.IsNAT() && targetAddr.IsNAT()) // 双内网无法连接
return false;
}
return true;
}
void P2PMgr::GetPeersForNeighbour(list<PeerInfoWithAddr> &array, UINT wantNum, P2PAddress& addr) {
for(ClientIt it = m_clients.begin(); it != m_clients.end() && array.size() < wantNum; ++it) {
if(!it->IsValid())
continue;
if(it->GetIsCachePeer()) // 不用告诉别人CP的地址
continue;
if(!IsConnectablePeer(addr, it->GetAddress())) // not connectable
continue;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -