⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 p2pmgr.cpp

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 CPP
📖 第 1 页 / 共 3 页
字号:
				if(!bFirstTime)
					assert(!nullLastTime);
				bFirstTime = false;
				nullLastTime = false;
				// 打印当前资源的状态
				mgr->comm->currRes->PrintStatus();
			}
			else 
				nullLastTime = true;
		}

		if(currTime-lastCSReportTime >= CS_REPORT) {
			csReportCount++;

			// CSClient发送
			lastCSReportTime = currTime;
			mgr->comm->csClient.SendLogin();
			mgr->comm->csClient.SendReport((csReportCount%30==0));
			// 清空增量列表
			if(mgr->comm->currRes)
				mgr->comm->currRes->ClearDiffIntervals(true);
		}

		if(currTime-lastCSReport2Time >= CS_REPORT2) {
			// CSClient发送
			lastCSReport2Time = currTime;
			mgr->comm->csClient.SendReport2();
			mgr->comm->csClient.SendNeedPeers();
		}

		if(currTime-lastClearBadAddress >= P2P_CLEAR_BADADDR) {
			lastClearBadAddress = currTime;
			// 清空无法连接的地址列表
			mgr->m_badAddr.clear();
		}

		if(currTime-lastP2PReportTime >= P2P_SEND_REPORT) {
			lastP2PReportTime = currTime;
			p2pReportCount++;

			CorePeerInfo thisPeer;
			mgr->GetSelfInfo(thisPeer);
			// 如果发现本次的SelfPeerInfo于上次的相同,或者本Peer拥有的块发生变化,则广播
			if(memcmp(&thisPeer, &mgr->lastSent, sizeof(thisPeer)) != 0 || mgr->m_bBlockIntervalHasChanged) {
				for(it = mgr->m_clients.begin(); it != mgr->m_clients.end(); ++it) {
					if(it->IsValid())
						it->SendReport(thisPeer, (p2pReportCount%20==0));
				}
				// 记录本次广播的自身信息
				memcpy(&mgr->lastSent, &thisPeer, sizeof(thisPeer));
			}

			// 清空增量列表
			if(mgr->comm->currRes)
				mgr->comm->currRes->ClearDiffIntervals(false);
		}
		if(currTime-lastP2PNearPeerTime >= P2P_NEAR_PEERS) {
			lastP2PNearPeerTime = currTime;
			for(it = mgr->m_clients.begin(); it != mgr->m_clients.end(); ++it) {
				if(it->IsValid())
					it->SendNearPeers();
			}
		}
		if(mgr->comm->currRes && currTime-lastCheckPushListTime >= P2P_CHECK_PUSHLIST) {
			lastCheckPushListTime = currTime;
			for(it = mgr->m_clients.begin(); it != mgr->m_clients.end(); ++it) {
				if(it->IsValid()) {
					it->ReloadPushList();
				}
			}
		}
		if (mgr->comm->localPeerFinder.IsRunning() && currTime - lastBroadcastInfoTime >= LPF_BROADCAST) {
			lastBroadcastInfoTime = currTime;
			mgr->comm->localPeerFinder.SendBroadcast();
		}

		if(!mgr->comm->currRes) {
			mgr->StopAll();
		}
	}
}

void P2PMgr::SafeCloseSocket(SOCKET sock) {
	SOCKET* nSock = new SOCKET;
	HANDLE closeThead = NULL;
	if(nSock) {
		*nSock = sock;
		DWORD threadID;
		closeThead = CreateThread(
			NULL, 0, (LPTHREAD_START_ROUTINE)(P2PMgr::RunCloseSocket), nSock, 0, &threadID);
	}
	if(closeThead == NULL || nSock == NULL) {
		// 如果不能起一个线程,就只好慢慢Close
		assert(0);
		TE_CloseSocket(sock, FALSE);
	}
	else if(closeThead){
		// 设置较低的优先级
		SetThreadPriority(closeThead, THREAD_PRIORITY_BELOW_NORMAL);
		// 这个线程启动之后我们将不再关心它的死活:)
		CloseHandle(closeThead);
	}
}

// 安全关闭SOCKET的线程
void __stdcall P2PMgr::RunCloseSocket(SOCKET* sock) {
	if(sock) {
		// 因为TE_CloseSocket很慢,所以需要起一个线程
		TE_CloseSocket(*sock, FALSE);
		delete sock;
	}
}

void P2PMgr::StopAll() {
	// 断开所有连接
	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		if(!it->IsValid())
			continue;
		it->SetInvalid();
	}
	// 清空正在连接的列表
	for(CPeerIt itt = m_connecting.begin(); itt != m_connecting.end(); ++itt)
		SafeCloseSocket(itt->sock);
	m_connecting.clear();
	// 清空已知Peer的列表
	m_peers.clear();
	// 清空无法连接的地址列表
	m_badAddr.clear();
	// 清空需要连接的列表
	m_connectto.clear();
}

// 清除所有连接发送媒体类型区间的记录
void P2PMgr::ClearSentMediaArray() {
	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		if(!it->IsValid())
			continue;
		it->ClearSentMediaArray();
	}
}

void P2PMgr::AddConnecting(const ConnectingPeer& peer) {
	// 正在连接中
	m_connecting.push_back(peer);
}

void P2PMgr::AddP2PClient(const ConnectingPeer& peer) {
	// 已经连上, 检查此种类型连接是否已满
	bool bConnectionFull = IsConnectionFull(peer.isIncoming, peer.isForFree, false);
	if(bConnectionFull) {
		comm->logFile.StatusOut("connection is full, abandon connection.");
		SafeCloseSocket(peer.sock); // no force disconnect
		return;
	}

	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		if(!it->IsValid()) {
			if(it->SetValid(comm, peer))
				break;
			else
				it->SetInvalid();
		}
	}
	if(it == m_clients.end()) {
		assert(0);
		comm->logFile.StatusOut("Shouldn't be here!");
		SafeCloseSocket(peer.sock); // no force disconnect
	}
}

void P2PMgr::AddConnectTo(const P2PAddress& addr, bool connectForFree) {
	if(addr == comm->localAddress || addr.outerIP.sin_addr.s_addr == 0)
		return;
	ConnectingPeer peer;
	memcpy((P2PAddress*)&peer, &addr, sizeof(P2PAddress));

	peer.isForFree = connectForFree;
	peer.isCachePeer = false;

	PeerAddrIt it = find(m_peers.begin(), m_peers.end(), peer);
	if(it != m_peers.end()) // found in known list
		return;
	list<P2PAddress>::const_iterator it1;
	it1 = find(m_badAddr.begin(), m_badAddr.end(), addr);
	if(it1 != m_badAddr.end()) // found in bad address list
		return;
	list<ConnectingPeer>::const_iterator it2;
	it2 = find(m_connectto.begin(), m_connectto.end(), peer);
	if(it2 != m_connectto.end()) // found in connectto list
		return;

	m_connectto.push_back(peer);
}

void P2PMgr::AddPeerInfo(PeerInfoWithAddr& peer) {
	if(peer == comm->localAddress || peer.outerIP.sin_addr.s_addr == 0)
		return;

	PeerAddrIt it = find(m_peers.begin(), m_peers.end(), peer);
	if(it != m_peers.end()) { // found in known list
		memcpy(&*it, &peer, sizeof(PeerInfoWithAddr)); // replace old
		return;
	}
	list<P2PAddress>::const_iterator it1;
	it1 = find(m_badAddr.begin(), m_badAddr.end(), peer);
	if(it1 != m_badAddr.end()) // found in bad address list
		return;
	list<ConnectingPeer>::const_iterator it2;
	for(it2 = m_connectto.begin(); it2 != m_connectto.end(); ++it2) {
		if((PeerInfoWithAddr)*it2 == peer) // found in connectto list
			return;
	}

	m_peers.push_back(peer);
}

BOOL P2PMgr::GetPeer4Connect(ConnectingPeer& peer) {
	// 当前资源尚未下载完毕
	if(comm->currRes) {
		// 1. 选择一个地址主动连接
		for(PeerAddrIt it = m_peers.begin(); it != m_peers.end(); ++it) {
			// 判断出度是否已满
			if(!IsConnectionFull(false, false, true)) {
				// 复制Peer地址
				memcpy((PeerInfoWithAddr*)&peer, &*it, sizeof(PeerInfoWithAddr));
				m_peers.erase(it);
				// 一个地址只能有一个非Free的连接和一个Free的连接(CP的不定)
				if(CheckN4One(peer, false, false, peer.isCachePeer?MAX_CONNECTION_PER_CP:MAX_CONNECTION_PER_NP)) {
					peer.isForFree = peer.isIncoming = peer.isPassive = false;
					return TRUE;
				}
				else {
					it = m_peers.begin();
					continue;
				}
			}
		}
	}

	// 2. 如果没有主动去连接的Peer,则连接需要被动去连接的Peer
	for(CPeerIt it = m_connectto.begin(); it != m_connectto.end(); ++it) {
		// 判断连接是否已满
		if(!IsConnectionFull(true, it->isForFree, true)) {
			// 复制Peer地址
			memcpy((ConnectingPeer*)&peer, &*it, sizeof(ConnectingPeer));
			m_connectto.erase(it);
			// 判断此地址是否已经有连接
			if(CheckN4One(peer, peer.isForFree, false, MAX_CONNECTION_PER_NP)) {
				peer.isIncoming = true;		// 算作连入连接,因为是对方需要己方的数据
				peer.isCachePeer = false;	// 非CP
				peer.isPassive = true;		// 被动连接
				peer.layer = 0xff;			// 对方的layer未知
				peer.isMaxIn = peer.isMaxOut = peer.isMaxFreeIn = peer.isMaxFreeOut = false;
				return TRUE;
			}
			else {
				it = m_connectto.begin();
				continue;
			}
		}
	}
	
	// 没有任何可以连接的地址
	return FALSE;
}

void P2PMgr::BroadCastSPUpdate(SPUpdate& spUpdate, BYTE sum) {
	UINT8 selfLayer = GetSelfLayer();
	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		if(it->IsValid())
			it->SendSPUpdate(spUpdate, selfLayer, sum);
	}
}

void P2PMgr::AddBadAddr(P2PAddress addr) {
	list<P2PAddress>::const_iterator cit = 
		find(m_badAddr.begin(), m_badAddr.end(), addr);
	if(cit == m_badAddr.end()) {
		// 如果BadAddr的列表持续增加,在特殊情况下可能会占用很多内存
		// 所以保持在少于100个比较好
		if(m_badAddr.size() > 100)
			m_badAddr.pop_front();
		m_badAddr.push_back(addr);
	}
}

BOOL P2PMgr::CheckN4One(P2PAddress addr, bool isForFree, bool isConnected, UINT max) {
	// 连接计数
	UINT count = 0;
	// 如果已经连上,要先加上此连接自身
	if(isConnected)
		++count;
	// 查找来自同一地址的连接, 如果超过max值,就返回false
	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		if(!it->IsValid())
			continue;
		if(it->GetAddress() == addr) {
			if(isForFree == it->GetIsForFree()) {
				count++;
				if(count == max)
					return FALSE; 
			}
		}
	}
	// 如果是已经连上的连接, 显然优先级比正在连接的高, 所以不予计算
	if(!isConnected) {
		// 在正在连接的地址列表中查找同一地址,如果超过max值,就返回false
		for(CPeerIt itt = m_connecting.begin(); itt != m_connecting.end(); ++itt) {
			ConnectingPeer* peer = &*itt;
			if(*peer == addr) {
				if(isForFree == itt->isForFree) {
					count++;
					if(count == max)
						return FALSE; 
				}
			}
		}
	}
	return TRUE;
}

bool P2PMgr::IsConnectablePeer(const P2PAddress& connectAddr, const P2PAddress& targetAddr) {
	if(connectAddr == targetAddr) // 不用告诉别人他自己的地址
		return false;

	// 如果外网地址不同,那么...
	if(connectAddr.outerIP.sin_addr.s_addr != targetAddr.outerIP.sin_addr.s_addr) {
		if(!connectAddr.IsNAT() && targetAddr.IsNAT()) // 外网无法连接内网
			return false;
		if(connectAddr.IsNAT() && targetAddr.IsNAT()) // 双内网无法连接
			return false;
	}
	return true;
}

void P2PMgr::GetPeersForNeighbour(list<PeerInfoWithAddr> &array, UINT wantNum, P2PAddress& addr) {
	for(ClientIt it = m_clients.begin(); it != m_clients.end() && array.size() < wantNum; ++it) {
		if(!it->IsValid())
			continue;
		if(it->GetIsCachePeer()) // 不用告诉别人CP的地址
			continue;
		if(!IsConnectablePeer(addr, it->GetAddress())) // not connectable
			continue;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -