⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 p2pmgr.cpp

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 CPP
📖 第 1 页 / 共 3 页
字号:
		array.push_back(it->GetPeerInfo());
	}

	PeerAddrIt it1 = m_peers.begin();
	for(; it1 != m_peers.end() && array.size() < wantNum; ++it1) {
		if(!IsConnectablePeer(addr, *it1))
			continue;
		array.push_back(*it1);
	}

	for(CPeerIt it2 = m_connecting.begin(); it2 != m_connecting.end() && array.size() < wantNum; ++it2) {
		if(!IsConnectablePeer(addr, *it2))
			continue;
		array.push_back(*it2);
	}
}

void P2PMgr::SendPackets() {
	bool noIncrease = false;
	TCPPacket* packet = NULL;
	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		// 如果不用继续发送上轮循环连接的数据,则把it降回去
		if(noIncrease) {
			--it;
			noIncrease = false;
		}

		if(!it->IsValid())
			continue;
		it->GetPacket(packet);
		if(!packet)
			continue;
		int ret = send(it->GetSocket(), packet->buf+packet->sent, packet->size-packet->sent, 0);
		if(SOCKET_ERROR == ret) {
			DWORD lastError = ::WSAGetLastError();
			if (WSAEWOULDBLOCK == lastError) {
				it->PutPacketBack(packet);
			}
			else {
				it->SetInvalid();
				ReleasePacket(packet);
				comm->logFile.StatusErr("Sending data on TCP", lastError);
			}
		}
		else {
			AddOutgoingBytes(ret);
			it->AddOutgoingBytes(ret);

			assert(ret <= packet->size-packet->sent);
			if(ret == packet->size-packet->sent) {
				// 已经发送完毕,释放Buffer
				if(packet->GetMsgType() == P2P_RESPONSE) {
					AddBlockDataUp(*packet->GetBlockSize());
				}
				ReleasePacket(packet);
				// 呵呵,此连接的一个块已经发完了!接着发送下一个
				if(!it->SendFirstPushBlock())
					assert(0);
				noIncrease = true;
			}
			else {	// 尚未发送完毕,下次继续发送
				packet->sent += ret;
				it->PutPacketBack(packet);
			}
		}
	}
}

UINT8 P2PMgr::GetSelfLayer() {
	if(GetCPConCount() > 0)
		return 1;	// 连接了CP,肯定是第一层
	UINT8 layer = 0xff;
	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		if(!it->IsValid())
			continue;
		// 必须和本机是同一个资源才有价值
		if(!it->GetIsSameRes())
			continue;
		if(it->GetLayer() == 0xff) // 尚未收到P2P_HELLO消息
			continue;
		if(it->GetLayer() <= 1)
			assert(0);
		layer = min(layer, it->GetLayer());
	}

	return layer;
}

void P2PMgr::GetSelfInfo(CorePeerInfo& thisPeer) {
	thisPeer.isCachePeer = false;
	thisPeer.layer = GetSelfLayer();
	thisPeer.isMaxIn = (GetIncomingCount() >= comm->cfgData.INCOMING_NUM);
	thisPeer.isMaxOut = (GetOutgoingCount() >= comm->cfgData.OUTGOING_NUM);
	thisPeer.isMaxFreeIn = (GetFreeInCount() >= comm->cfgData.IN_4FREE_NUM);
	thisPeer.isMaxFreeOut = (GetFreeOutCount() >= comm->cfgData.OUT_4FREE_NUM);
}

UINT8 P2PMgr::ExGetCount(bool in, bool out, bool freeIn, bool freeOut, bool cponly, bool includeConnecting) {
	UINT8 count = 0;
	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		if(!it->IsValid())
			continue;
		if(cponly) {
			count += it->GetIsCachePeer()?1:0;
			continue;
		}
		if(it->GetIsIncoming()) {
			if(it->GetIsForFree())
				count += freeIn?1:0;
			else
				count += in?1:0;
		}
		else {
			if(it->GetIsForFree())
				count += freeOut?1:0;
			else
				count += out?1:0;
		}
	}
	if(includeConnecting) {
		for(CPeerIt it1 = m_connecting.begin(); it1 != m_connecting.end(); ++it1) {
			if(cponly) {
				count += it->GetIsCachePeer()?1:0;
				continue;
			}
			if(it1->isIncoming) {
				if(it1->isForFree)
					count += freeIn?1:0;
				else
					count += in?1:0;
			}
			else {
				if(it1->isForFree)
					count += freeOut?1:0;
				else
					count += out?1:0;
			}
		}
	}
	return count;
};

// 检查连接数目是否已满
bool P2PMgr::IsConnectionFull(bool isIncoming, bool isForFree, bool includeConnecting) {
	if(isIncoming) {
		if(isForFree)
			return GetFreeInCount(includeConnecting) >= comm->cfgData.IN_4FREE_NUM;
		return GetIncomingCount(includeConnecting) >= comm->cfgData.INCOMING_NUM;
	}
	if(isForFree)
		return GetFreeOutCount(includeConnecting) >= comm->cfgData.OUT_4FREE_NUM;
	return GetOutgoingCount(includeConnecting) >= comm->cfgData.OUTGOING_NUM;
}

UINT16 P2PMgr::GetLowVersionCount() {
	if(m_lowversionCount >= 0xffff)
		return 0xffff;
	return static_cast<UINT16>(m_lowversionCount);
}

UINT16 P2PMgr::GetConnectFailCount() {
	if(m_connectFailCount >= 0xffff)
		return 0xffff;
	return static_cast<UINT16>(m_connectFailCount);
}

UINT16 P2PMgr::GetTotalIncomingCount() {
	if(m_incomingCount >= 0xffff)
		return 0xffff;
	return static_cast<UINT16>(m_incomingCount);
}

UINT16 P2PMgr::GetTotalOutgoingCount() {
	if(m_outgoingCount >= 0xffff)
		return 0xffff;
	return static_cast<UINT16>(m_outgoingCount);
}

UINT16 P2PMgr::GetAvgIncomingTime() {
	UINT totalTime = m_incomingElapsedTime;
	UINT count = m_incomingCount;
	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		if(!it->IsValid())
			continue;
		if(it->GetIsIncoming()) {
			count++;
			totalTime += it->GetElapsedTime();
		}
	}
	if(count == 0)
		return 0;
	UINT avgTime = totalTime/count;
	if(avgTime/1000 >= 0xffff)
		return 0xffff;
	return static_cast<UINT16>(avgTime/1000);
}

UINT16 P2PMgr::GetAvgOutgoingTime() {
	UINT totalTime = m_outgoingElapsedTime;
	UINT count = m_outgoingCount;
	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		if(!it->IsValid())
			continue;
		if(!it->GetIsIncoming()) {
			count++;
			totalTime += it->GetElapsedTime();
		}
	}
	if(count == 0)
		return 0;
	UINT avgTime = totalTime/count;
	if(avgTime/1000 >= 0xffff)
		return 0xffff;
	return static_cast<UINT16>(avgTime/1000);
}

void P2PMgr::ConnectionEstablished(bool isIncoming) {
	if(isIncoming)
		m_incomingCount++;
	else
		m_outgoingCount++;
}

void P2PMgr::ConnectionClosed(bool isIncoming, DWORD elapsedTime) {
	if(isIncoming) {
		m_incomingElapsedTime += elapsedTime;
	}
	else {
		m_outgoingElapsedTime += elapsedTime;
	}
}

float P2PMgr::GetMessagePercent() {
	// 计算P2P网络中数据长度与控制信息长度的比例, 以不超过5%为佳
	TransferInfo tiTemp;
	GetTransferInfo(tiTemp);

	LONGLONG allMsg = 0;
	LONGLONG allData = 0;
	allMsg += tiTemp.totalDownBytes + tiTemp.totalUpBytes - m_totalBlockDataUp - m_totalBlockDataDown;
	allData += tiTemp.totalDownBytes + tiTemp.totalUpBytes;
	double a = (double)allMsg/(1024);
	double b = (double)allData/(1024);
	double msgPercent = (allData==0?0:(((double)allMsg/allData)*100));
	comm->logFile.StatusOut("Msg Percent: %f%(%fKB/%fKB), Share Percent: %f%(%.2f/%.2fMB), Data From CP: %.2fMB", msgPercent, a, b, 
		(tiTemp.totalUpBytes*100.f)/tiTemp.totalDownBytes, 
		tiTemp.totalUpBytes/1024.f/1024.f, tiTemp.totalDownBytes/1024.f/1024.f, 
		m_totalBytesFromCP/1024.f/1024.f);
	return static_cast<float>(msgPercent);
}

// 从现有连接(参数client的连接除外)的Push List中回收一个块
// 情况1. 某个连接下载到了一个Block,并且在该连接的Push List中没有找到这个块。
//        那么说明这个块很可能是上次改变Push List之际,对方发送过来的;
void P2PMgr::ReclaimBlocks(P2PClient* client, UINT blockID) {
	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		// 不检查无效连接和提交此块的连接,如果client为NULL,则检查所有有效连接
		if(!it->IsValid() || &*it == client)
			continue;
		// 如果找到了,就删除并跳出循环,因为每块都只会被分配一次
		if(it->DelPushBlock(blockID))
			break;
	}
}

// 重新分配所有所有连接的Push List.
// 当新增一条连接时,当一个连接的PushList为空时
// 这个开销(网络流量)其实比较大,还需要商榷,作为暂时的解决办法是没问题的。
void P2PMgr::RedistributeAllBlocks(P2PClient* client) {
	assert(comm->currRes);
	if(!comm->currRes)
		return;

	if(client) {
		// 如果是普通下载方式,只给此连接分配少量的块。
		client->ReloadPushList();
	}
	else {
		for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
			if(!it->IsValid() || client == &*it)
				continue;
			it->ReloadPushList();
		}
	}
}

// 在除了client以外的所有连接的PushList中查找blockID
bool P2PMgr::FindInAllPushList(const UINT blockID, P2PClient* client) {
	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		if(!it->IsValid() || client == &*it)
			continue;
		if(it->FindPushBlock(blockID))
			return true;
	}
	return false;
}

// 在除了client以外的所有连接(是否包含CP)的remoteInterval中查找blockID
UINT8 P2PMgr::FindInAllRemoteInterval(const blockID, P2PClient* client, bool includeCP, float minSpeed) {
	UINT8 count = 0;
	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		if(!it->IsValid() || client == &*it)
			continue;
		if(!includeCP && it->GetIsCachePeer())
			continue;
		if(it->GetCurDownSpeed()/1024 < minSpeed)
			continue;
		if(it->FindRemoteBlock(blockID))
			++count;
	}
	return count;
}

// 在除了client以外的所有LAN连接的remoteInterval中查找blockID
UINT8 P2PMgr::FindInAllLanRemoteInterval(const blockID, P2PClient* client, float minSpeed) {
	UINT8 count = 0;
	for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
		if(!it->IsValid() || client == &*it)
			continue;
		if(!it->GetIsSameLan())
			continue;
		if(it->GetCurDownSpeed()/1024 < minSpeed)
			continue;
		if(it->FindRemoteBlock(blockID))
			++count;
	}
	return count;
}

LPCSTR P2PMgr::FormatIPAddress(P2PAddress& addr) {
	strcpy(TempIP, inet_ntoa(addr.outerIP.sin_addr));
	if(addr.subnetIP.sin_addr.s_addr != UINT_MAX) {
		strcat(TempIP, "->");
		strcat(TempIP, inet_ntoa(addr.subnetIP.sin_addr));
	}
	strcat(TempIP, ":");
	sprintf(TempIP+strlen(TempIP), "%d", ntohs(addr.subnetIP.sin_port));
	return TempIP;
}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -