⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 p2pclient.cpp

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 CPP
📖 第 1 页 / 共 3 页
字号:
	// 如果PeerList中元素个数大于0xff,则删除更多的元素
	UINT8 listSize = min(0xff, peerList.size());
	peerList.resize(listSize);

	if(listSize == 0) {
		// 释放这个Packet
		comm->p2pMgr.ReleasePacket(packet);
		return TRUE;
	}

	// 列表大小
	CSClient::CopyMoveDes(sendPointer, &listSize, sizeof(listSize));

	// write peerInfo one by one
	for(PeerAddrIt it = peerList.begin(); it != peerList.end(); ++it) {
		CSClient::CopyMoveDes(sendPointer, &*it, sizeof(PeerInfoWithAddr));
	}

	comm->logFile.StatusOut("Sending P2P_NEAR_PEERS to %s.", comm->p2pMgr.FormatIPAddress(remotePeer));

	SendEnd(packet);
	return TRUE;
}

BOOL P2PClient::SendPushList(UINT blockID, bool bAdd) {
	// 是否更新全部
	bool bRefresh = (blockID == UINT_MAX);
	// 如果大小为零就不必更新
	if(bRefresh && remotePush.GetValidSize() == 0)
		return TRUE;

	TCPPacket* packet;
	if(!SendBegin(packet, P2P_PUSHLIST))
		return FALSE;

	CSClient::CopyMoveDes(sendPointer, &bRefresh, sizeof(bRefresh));

	if(bRefresh) {
		// 更新整个PushList
		UINT8 count = 0xff;
		remotePush.CopyPushList(reinterpret_cast<UINT*>(sendPointer+sizeof(count)), count);
		CSClient::CopyMoveDes(sendPointer, &count, sizeof(count));
		sendPointer += count*sizeof(UINT);

		comm->logFile.StatusOut("Sending P2P_PUSHLIST %s block(%d) ... to %s.", 
			bRefresh?"Refresh":(bAdd?"Add":"Del"), count, comm->p2pMgr.FormatIPAddress(remotePeer));
	}
	else {
		UINT8 count = bAdd?1:0;
		CSClient::CopyMoveDes(sendPointer, &count, sizeof(count));
		if(bAdd)
			CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));
		count = bAdd?0:1;
		CSClient::CopyMoveDes(sendPointer, &count, sizeof(count));
		if(!bAdd)
			CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));

		comm->logFile.StatusOut("Sending P2P_PUSHLIST %s block %d to %s.", 
			bAdd?"Add":"Del",blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
	}

	SendEnd(packet);
	return TRUE;
} 

BOOL P2PClient::SendResponse(UINT blockID, bool tryGetData) {
	// 此块所在的区间,其媒体类型是否已经发送过了
	if(!sentMediaArray.FindBlock(blockID)) {
		// 没有发送过,则发送此块所在区间的媒体类型
		if(!SendMediaType(blockID))
			return FALSE;
	}

	TCPPacket* packet;
	if(!SendBegin(packet, P2P_RESPONSE))
		return FALSE;

	// 请求的BlockID
	CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));

	// 检查是否存在这个Block,取出他
	UINT blockSize = 0;
	// 如果需要获取数据,则尝试获取数据;如果只是为了告诉对方此块不存在,则不作尝试;
	if(comm->currRes && tryGetData) {
		P2P_RETURN_TYPE ret = comm->currRes->GetBlock(blockID, blockSize, sendPointer+sizeof(blockSize));
		if(ret < PRT_OK) {
			// 一定要释放这个packet
			comm->p2pMgr.ReleasePacket(packet);
			return ret;
		}
	}

	// write block size (bSize==0 means no data)
	CSClient::CopyMoveDes(sendPointer, &blockSize, sizeof(blockSize));

	// 将指针移动到数据的末尾
	sendPointer += blockSize;

	comm->logFile.StatusOut("Sending P2P_RESPONSE block(%d)(%d) to %s.", 
		blockID, blockSize, comm->p2pMgr.FormatIPAddress(remotePeer));

	if(blockSize > 0) {
		// 记录此次发送数据的时间
		lastSendDataTime = timeGetTime();
	}

	SendEnd(packet);

	if(tryGetData && blockSize == 0) {
		// 如果想要发送这个块,结果发现本机没有,则尝试发送下一个块
		return SendFirstPushBlock();
	}
	return TRUE;
}

BOOL P2PClient::SendReqMedia(UINT blockID) {
	TCPPacket* packet;
	if(!SendBegin(packet, P2P_REQMEDIA))
		return FALSE;
	
	CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));

	SendEnd(packet);
	return TRUE;
}

BOOL P2PClient::SendMediaType(UINT blockID) {
	if(!comm->currRes)
		return FALSE;
	// 查找媒体类型
	MediaInterval mInterval;
	if(!comm->currRes->GetMediaInterval(blockID, mInterval))
		return TRUE;

	TCPPacket* packet;
	if(!SendBegin(packet, P2P_MEDIATYPE))
		return FALSE;
	
	// 写入区间
	CSClient::CopyMoveDes(sendPointer, &mInterval.start, sizeof(mInterval.start));
	CSClient::CopyMoveDes(sendPointer, &mInterval.size, sizeof(mInterval.size));

	// 写入媒体数据长度
	UINT len = sizeof(mInterval.videoType)+mInterval.videoType.cbFormat+sizeof(mInterval.audioType)+mInterval.audioType.cbFormat;
	CSClient::CopyMoveDes(sendPointer, &len, sizeof(len));

	// 写入媒体数据
	CSClient::CopyMoveDes(sendPointer, &mInterval.videoType, sizeof(mInterval.videoType));
	CSClient::CopyMoveDes(sendPointer, mInterval.videoData, mInterval.videoType.cbFormat);
	CSClient::CopyMoveDes(sendPointer, &mInterval.audioType, sizeof(mInterval.audioType));
	CSClient::CopyMoveDes(sendPointer, mInterval.audioData, mInterval.audioType.cbFormat);

	// 超过ADD_PROGNAME_VERSION的版本将发送节目的名字
	if(remoteVersion > comm->cfgData.ADD_PROGNAME_VERSION) {
		// 写入节目的名字
		CSClient::CopyMoveDes(sendPointer, &mInterval.pnamesize, sizeof(mInterval.pnamesize));
		CSClient::CopyMoveDes(sendPointer, mInterval.pname, mInterval.pnamesize);
		// 超过ADD_PROGTIME_VERSION的版本将发送节目的时间和频道名
		if(remoteVersion > comm->cfgData.ADD_PROGTIME_VERSION) {
			// 写入节目的时间长度
			CSClient::CopyMoveDes(sendPointer, &mInterval.progtime, sizeof(mInterval.progtime));
			// 写入频道的名字
			CSClient::CopyMoveDes(sendPointer, &mInterval.cnamesize, sizeof(mInterval.cnamesize));
			CSClient::CopyMoveDes(sendPointer, mInterval.cname, mInterval.cnamesize);
		}
	}

	SendEnd(packet);

	sentMediaArray.AddInterval(mInterval);
	return TRUE;
}

BOOL P2PClient::SendBegin(TCPPacket*& packet, UINT8 msgType) {
	packet= comm->p2pMgr.AllocatePacket();
	if(!packet)
		return FALSE;

	// 先留着消息大小不写,到最后再写
	sendPointer = packet->buf+sizeof(UINT);
	CSClient::CopyMoveDes(sendPointer, &msgType, sizeof(msgType));

	return TRUE;
}

void P2PClient::SendEnd(TCPPacket*& packet) {
	// 消息的大小就是移动的指针减去初始的指针
	packet->size = sendPointer-packet->buf;
	packet->sent = 0;
	memcpy(packet->buf, &packet->size, sizeof(packet->size));

	m_sendList.push_back(packet);
}

// 添加一个要Push的块
bool P2PClient::AddPushBlock(UINT blockID, bool bRefreshing) {
	if(!remotePush.AddBlock(blockID, timeGetTime()))
		return false;
	if(remotePush.GetValidSize() == 1) {
		// pushlist大小从0到1,说明新的下载即将开始
		lastRecvDataHdTime = timeGetTime();

		// pushlist大小从0到1,记录此时的时间,用于统计实际传输速度
		m_reqStartTime = timeGetTime();
		//comm->logFile.StatusOut("Request Start Time %d", m_reqStartTime);
	}
	if(!bRefreshing) {
		// 如果不是正在更新整个PushList,就立即发送消息
		if(!SendPushList(blockID, true)) {
			if(!remotePush.DelBlock(blockID))
				assert(0);
			return false;
		}
	}
	return true;
}

// 查找一个PushList中的块
bool P2PClient::FindPushBlock(const UINT blockID) const {
	return remotePush.FindBlock(blockID);
}

// 删除一个要Push的块
bool P2PClient::DelPushBlock(UINT blockID) {
	// 从PushList中查找并删除此块
	if(!remotePush.DelBlock(blockID))
		return false;
	// 发送删除一个块的消息
	return SendPushList(blockID, false) == TRUE;
}

// Push List是否已满
bool P2PClient::IsPushListFull() {
	return (remotePush.GetValidSize() == remotePush.GetTotalSize());
}

// 取得Pushlist大小
UINT8 P2PClient::GetTotalPushSize() {
	return remotePush.GetTotalSize();
}

// 清空PushList
void P2PClient::ClearPush() {
	remotePush.Clear();
}

// 发送整个PushList,因为之前重新分配了所有连接的块
void P2PClient::SendPush() {
	if(!SendPushList(UINT_MAX, false))
		assert(0);
}

// 重新加载PushList,只有在普通的下载时才会被调用
void P2PClient::ReloadPushList() {
	if(remotePush.GetValidSize() > 0)
		return;

	UINT array[P2P_HIGH_PUSH];
	std::fill(array, array+P2P_HIGH_PUSH, UINT_MAX);
	UINT count = 0;

	// 计算此连接一次push的大小
	UINT pushNum = P2P_MID_PUSH; // default
	// 注意两者单位不同,前者是BPS, 后者是KBPS
	if(avgDownSpeed/1024/comm->currRes->GetBitRate() < 0.1667f)
		pushNum = P2P_LOW_PUSH;
	else if(avgDownSpeed/1024/comm->currRes->GetBitRate() > 0.5f)
		pushNum = P2P_HIGH_PUSH;
	//else 
	//	the default value

	// 1. 获取可以下载的区间列表
	IntervalArray downloadable;
	comm->currRes->GetDownloadableArray(remoteInterval, downloadable);
	if(downloadable.IsEmpty()) {
		return;
	}

	// 2. 遍历此区间列表,根据条件排除不可以下载的块,直到遍历完毕或者找到pushNum个Block
	BlockInterval temp;
	while(!downloadable.IsEmpty() && count < pushNum) {
		downloadable.PopFront(temp);
		if(temp.size == 0)
			break;
		for(UINT i = temp.start; i < temp.start+temp.size && count < pushNum; ++i) {
			// 如果此块有别人正在下载,则去除此块
			if(comm->p2pMgr.FindInAllPushList(i, this))
				continue;
			// 如果有其他NP有此块,那么就不要从CP上下载了, 前提是缓冲进度100%而且有此块的NP下载速度大于5.0KBPS
			if(GetIsCachePeer() && 
				comm->currRes->GetBufferPercent() == 100 && 
				comm->p2pMgr.FindInAllRemoteInterval(i, this, false, 5.0f) > 0)
				continue;
			// 如果是外网节点,且内网节点上有,则不从这下载
			if (GetIsSameLan() &&
				comm->p2pMgr.FindInAllLanRemoteInterval(i, this, 5.0f) > 0)
				continue;

			// 记录一个可以下载的块
			array[count++] = i;
		}
	}

	if(count) {
		// 根据缓冲情况决定是否要随机下载
		if(comm->currRes->GetBufferPercent() == 100) {
			random_shuffle(array, array+count);
			comm->logFile.StatusOut("Do by random %d.", pushNum);
		}
		else
			comm->logFile.StatusOut("Do by order %d.", pushNum);

		// 添加到pushlist中,并发送
		UINT addNum = 0;
		for(UINT i = 0; i < count && addNum < pushNum; ++i) {
			if(array[i] != UINT_MAX) {
				if(!AddPushBlock(array[i], true))
					break;
				addNum++;
			}
		}
		if(addNum)
			SendPush();
	}
}

// 发送push列表中的第一个块
BOOL P2PClient::SendFirstPushBlock() {
	// 是否有需要发送的块,如果有,取出第一个(也是最小的一个)
	UINT blockID;
	localPush.GetFirstBlock(blockID);
	if(blockID == UINT_MAX)
		return TRUE;

	// 检查当前发送列表中是否已经有本连接的块,
	// 1. 如果没有,那么发送;
	// 2. 如果有,则暂时不发送
	if(!CheckSendingBlock())
		return SendResponse(blockID, true);

	localPush.AddBlock(blockID);
	return TRUE;
}


BOOL P2PClient::IsIdleTooLong() {
	DWORD tNow = timeGetTime();

	// 对于Listener, 是上次发送数据的时间;对于Connector,是上次收到数据的时间
	//int transferIdle = isIncoming?tNow-lastSendDataTime:tNow-lastRecvDataTime;
	int transferIdle = min(tNow-lastSendDataTime, tNow-lastRecvDataTime);
	// 明明push list非空,但是却没有收到数据头部的时间
	int responseIdle = tNow-lastRecvDataHdTime;

	GenerateTransferInfo(TRUE);
	TransferInfo tiTemp;
	GetTransferInfo(tiTemp);
	comm->logFile.StatusOut("%s: transfer idle %d. response idle %d. \r\n\
\tcurr down/up:%.2f/%.2f, avg down/up:%.2f/%.2f, total down/up:%.2f/%.2fMB. layer: %d. bandwidth: %.4fKBPS %s.", 
		comm->p2pMgr.FormatIPAddress(remotePeer), 
		transferIdle, responseIdle, 
		tiTemp.currDownSpeed/1024, tiTemp.currUpSpeed/1024, 
		tiTemp.avgDownSpeed/1024, tiTemp.avgUpSpeed/1024, 
		tiTemp.totalDownBytes/1024.f/1024.f, tiTemp.totalUpBytes/1024.f/1024.f, 
		GetLayer(), GetBandWidth(), isIncoming?"in":"out");

	BOOL kill = FALSE;
	if(bGotFirstMsg) {
		if(isIncoming)
			kill = (transferIdle > MAX_INCOMING_SENDDATA_IDLE);
		else
			kill = (transferIdle > MAX_OUTGOING_RECVDATA_IDLE);

		// TOOD: 如果pushlist非空,但是有一段时间没有数据的头部了,那么就杀掉此连接
		//       注意此timeout时间比transfer timeout要短一些
		kill = kill || (remotePush.GetValidSize() != 0 && responseIdle > MAX_RESPONSE_IDLE);
	}
	else {
		int startIdle = transferIdle;
		if(GetIsCachePeer()) {
			kill = (startIdle > MAX_CP_FIRST_MSG_IDLE);
		}
		else
			kill = (startIdle > MAX_NP_FIRST_MSG_IDLE);
	}

	if(kill) {
		comm->logFile.StatusOut("Kill Idle %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
	}
	return kill;
}

// 检查当前发送列表中是否已经有某连接的块,
// 1. 如果有,返回TRUE
// 2. 如果没有,那么返回FALSE;
BOOL P2PClient::CheckSendingBlock() {
	TCPPacket* packet = NULL;
	UINT temp;
	for(TCPPackIt it = m_sendList.begin(); it != m_sendList.end(); ++it) {
		packet = *it;

		// 如果消息类型不是P2P_RESPONSE,接着找下一个
		if(packet->buf[4] != P2P_RESPONSE)
			continue;

		// 如果blockSize为0,接着找下一个
		memcpy(&temp, packet->GetBlockSize(), sizeof(temp));
		if(temp == 0)
			continue;

		// 已经有准备发送的块
		return TRUE;
	}
	// 没有找到,说明可以发送blockID
	return FALSE;
}

// 从发送列表中查找一个块,如果此块准备发送(packet->send == 0),则删除并返回TRUE;
// 如果此块已经开始发送,则直接返回TRUE。如未找到,则返回FALSE
// 如果blockID是UINT_MAX,则删除任意一个准备发送的块
BOOL P2PClient::DeleteSendingBlock(UINT blockID) {
	TCPPacket* packet = NULL;
	UINT temp;
	UINT index = 0;
	for(TCPPackIt it = m_sendList.begin(); it != m_sendList.end(); ++it, index++) {
		packet = *it;

		// 如果消息类型不是P2P_RESPONSE,接着找下一个
		if(packet->GetMsgType() != P2P_RESPONSE)
			continue;

		// 如果不是目标块,则接着找下一个
		// 如果blockID是UINT_MAX,则删除所有准备发送的块
		memcpy(&temp, packet->buf+5, sizeof(temp));
		if(temp != blockID && blockID != UINT_MAX)
			continue;

		// 如果已经开始发送,就不能打断,返回TRUE
		if(packet->sent > 0)
			return TRUE;

		// 如果是尚未开始发送的块,则删除,返回TRUE
		comm->p2pMgr.ReleasePacket(packet);
		m_sendList.erase(it);
		return TRUE;
	}
	// 没有找到
	return FALSE;
}

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -