⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 p2pclient.cpp

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 CPP
📖 第 1 页 / 共 3 页
字号:

		// read intervals
		for(UINT8 i = 0; i < intervalNum; ++i) {
			BlockInterval temp;
			CSClient::CopyMoveSrc(&temp, recvPointer, sizeof(temp));
			if(j == 0)
				remoteInterval.AddInterval(temp.start, temp.size);
			else
				remoteInterval.DelInterval(temp.start, temp.size);
		}

		// 如果对方新增了Block
		if(j == 0 && intervalNum > 0) {
			// 检查PushList状况,如果只剩0块了,就要求重新分配所有连接的块
			if(remotePush.GetValidSize() == 0 && comm->currRes)
				comm->p2pMgr.RedistributeAllBlocks(this);
		}
	}

	//comm->logFile.StatusOut("Recv P2P_REPORT from %s", comm->p2pMgr.FormatIPAddress(remotePeer));

	return MSG_COMPLETE;
}

MSG_STATE P2PClient::OnNearPeers() {
	// get size of peer list and move the pointer
	UINT8 listSize;
	CSClient::CopyMoveSrc(&listSize, recvPointer, sizeof(listSize));

	// 检查listSize是否正常
	if(static_cast<int>(listSize*sizeof(PeerInfoWithAddr)) > recvBuf+P2P_BUF_SIZE-recvPointer)
		return MSG_ERR_LIST_SIZE;

	// read peer one by one, and add to known peer list
	for(UINT8 i = 0; i < listSize; ++i) {
		PeerInfoWithAddr peer;
		CSClient::CopyMoveSrc(&peer, recvPointer, sizeof(peer));
		peer.isCachePeer = false;
		comm->p2pMgr.AddPeerInfo(peer);
	}
	comm->logFile.StatusOut("Recv P2P_NEAR_PEERS(%d NP) from %s.", listSize, comm->p2pMgr.FormatIPAddress(remotePeer));
	return MSG_COMPLETE;
}

MSG_STATE P2PClient::OnPushList() {
	// 当没有当前资源时,不给对方发送数据
	if(!comm->currRes)
		return MSG_SEND_ERR;


	// 是否更新整个PushList
	bool bRefresh;
	CSClient::CopyMoveSrc(&bRefresh, recvPointer, sizeof(bRefresh));
	if(bRefresh) {
		// 清空所有准备发送的块(已经开始发送的块,不用清空)
		while(DeleteSendingBlock(UINT_MAX));
		localPush.Clear();
	}

	UINT blockID = UINT_MAX;
	if(bRefresh) {
		UINT8 count;
		CSClient::CopyMoveSrc(&count, recvPointer, sizeof(count));
		assert(count < 30);
		for(UINT8 i = 0; i < count; ++i) {
			CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
			// 查找本机有没有这个块,如果没有,立即告知对方;
			if(!comm->currRes->FindBlock(blockID)) {
				if(!SendResponse(blockID, false))
					return MSG_SEND_ERR;
			}
			else 
				localPush.AddBlock(blockID);
		}
		if(count)
			comm->logFile.StatusOut("Got request refresh(%d...) from %s.", blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
	}
	else {
		UINT8 count;
		CSClient::CopyMoveSrc(&count, recvPointer, sizeof(count));
		assert(count < 2);
		for(UINT8 i = 0; i < count; ++i) {
			CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
			// 查找本机有没有这个块,如果没有,立即告知对方;
			if(!comm->currRes->FindBlock(blockID)) {
				if(!SendResponse(blockID, false))
					return MSG_SEND_ERR;
			}
			else 
				localPush.AddBlock(blockID);
		}
		if(count)
			comm->logFile.StatusOut("Got request add(%d) from %s.", blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
		CSClient::CopyMoveSrc(&count, recvPointer, sizeof(count));
		assert(count < 2);
		for(UINT8 i = 0; i < count; ++i) {
			CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
			// 如果在localPush中查找并删除此块
			if(!localPush.DelBlock(blockID)) {
				// 如果在localPush中没有找到,则说明此块已经在发送列表中了,甚至可能已经被发送了
				// 尝试从发送列表中查找并删除此块
				if(!DeleteSendingBlock(blockID)) {
					comm->logFile.StatusOut("Ahhhhh..., the block has already been sent!");
				}
			}
		}
		if(count)
			comm->logFile.StatusOut("Got request del(%d) from %s.", blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
	}

	// 发送push列表中的第一个块
	if(!SendFirstPushBlock())
		return MSG_SEND_ERR;
	return MSG_COMPLETE;
}

MSG_STATE P2PClient::OnResponse() {
	// 如果对方是CachePeer, 则第一个收到的消息应该在这里
	if(GetIsCachePeer())
		bGotFirstMsg = TRUE;

	MSG_STATE ret = MSG_COMPLETE;

	// 回应的BlockID和大小
	UINT blockID;
	CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
	UINT blockSize;
	CSClient::CopyMoveSrc(&blockSize, recvPointer, sizeof(blockSize));

	// 检查Block大小是否正常
	if(static_cast<int>(blockSize) > recvBuf+P2P_BUF_SIZE-recvPointer) {
		ret = MSG_ERR_SIZE;
		assert(0);
	}

	// 检查对方返回的Block是不是在Push List之中,如果在,则删除之;
	// 如果不在,则检查是否在其他连接的Push List之中,如果在,则删除之
	DWORD reqTime;
	if(!remotePush.DelBlock(blockID, &reqTime))
		comm->p2pMgr.ReclaimBlocks(this, blockID);

	if(ret == MSG_COMPLETE) {
		if(blockSize > 0) {
			// 记录此次得到数据的时间
			lastRecvDataTime = timeGetTime();

			// 记录本次传输所用的时间
			m_transUsedTime += timeGetTime()-m_reqStartTime;
			//assert(lastRecvDataTime-m_reqStartTime < 25000);
			
			// 确实已经下载了,虽然可能发生存储错误
			comm->p2pMgr.AddBlockDataDown(blockSize);
			if(GetIsCachePeer())
				comm->p2pMgr.AddCPData(blockSize);

			// 存储此块
			P2P_RETURN_TYPE bSuccess = comm->currRes->PutBlock(blockID, blockSize, reinterpret_cast<PBYTE>(recvPointer));
			recvPointer += blockSize;

			if(bSuccess < PRT_OK) {
				if(bSuccess == PRT_BUFFER_FULL) {
					comm->logFile.StatusOut("缓冲区已满");
					Sleep(300);
				}
				else if(bSuccess == PRT_BAD_BLOCK) {
					if(!remotePeer.isCachePeer) {
						// 这个块是错误的,不再从这个NP下载此块
						remoteInterval.DelInterval(blockID, 1);
					}
				}
				else {
					comm->logFile.StatusErr("写入磁盘错误", GetLastError());
					// 发送写入磁盘错误消息
					comm->PostErrMessage(PNT_DISK_ERR, 0, true);
					ret = MSG_SAVEDATA_ERR;
					assert(0);
				}
			}

			char* temp = "";
#ifdef _DEBUG
			MD5 md5(reinterpret_cast<BYTE*>(recvPointer-blockSize), blockSize);
			temp = md5.hex_digest();
#endif
			comm->logFile.StatusOut("Got %sblock %d(%d)(%s)(used %dms) from %s.", 
				(bSuccess==PRT_BAD_BLOCK?"bad ":""), blockID, blockSize, temp, lastRecvDataTime-m_reqStartTime, 
				comm->p2pMgr.FormatIPAddress(remotePeer));
#ifdef _DEBUG
			delete [] temp;

			// 记录下次传输开始的时间
			m_reqStartTime = timeGetTime();
#endif
		}
		else {
			comm->logFile.StatusOut("Got block %d(%d) from %s.", 
				blockID, blockSize, comm->p2pMgr.FormatIPAddress(remotePeer));
		}
		// 既然得到了数据,登录不上TS的问题就算了:)
		comm->csClient.ResetLoginFail();
	}

	if(ret == MSG_COMPLETE) {
		if(blockSize == 0) {
			// 对方没有这个块,从区间表中删除,并将此块交给其他连接
			remoteInterval.DelInterval(blockID, 1);
		}
		// 检查PushList状况,如果只剩0块了,就要求重新分配所有连接的块
		if(remotePush.GetValidSize() == 0)
			comm->p2pMgr.RedistributeAllBlocks(this);
	}

	return ret;
}

MSG_STATE P2PClient::OnMsg() {
	//assert(GetIsCachePeer());
	// 错误代码
	UINT16 errCode;
	CSClient::CopyMoveSrc(&errCode, recvPointer, sizeof(errCode));

	// 是否需要断开连接
	bool shouldDisconnect;
	CSClient::CopyMoveSrc(&shouldDisconnect, recvPointer, sizeof(shouldDisconnect));

	// 根据错误代码处理
	switch(errCode) {
	case ERR_PROTOCOL_FORMAT:
		sprintf(errStr, "协议错误");
		break;
	case ERR_AUTHORIZATION:
		sprintf(errStr, "验证错误");
		break;
	case ERR_INTERNAL:
		sprintf(errStr, "未知错误");
		break;
	case ERR_CONNECTION_FULL:
		sprintf(errStr, "对方连接已满");
		break;
	default:
		shouldDisconnect = true;
	}
	if(shouldDisconnect) {
		// 停止连接这个地址
		comm->p2pMgr.AddBadAddr(remotePeer);
		return MSG_REMOTE_ERR;
	}

	return MSG_COMPLETE;
}

MSG_STATE P2PClient::OnReqMedia() {
	UINT blockID = 0;
	CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));

	if(!SendMediaType(blockID))
		return MSG_SEND_ERR;
	return MSG_COMPLETE;
}

MSG_STATE P2PClient::OnMediaType() {
	assert(comm->currRes);
	if(!comm->currRes)
		return MSG_DIFF_CHNL;

	// 一些较早版本的客户端会读取gtv文件中的媒体类型,然后发送很大的mediainterval,这种消息不予接受
	if(remoteVersion < comm->cfgData.ADD_PROGNAME_VERSION) {
		return MSG_COMPLETE;
	}

	MediaInterval mInterval;
	// 区间
	CSClient::CopyMoveSrc(&mInterval.start, recvPointer, sizeof(mInterval.start));
	CSClient::CopyMoveSrc(&mInterval.size, recvPointer, sizeof(mInterval.size));

	UINT len = 0;
	CSClient::CopyMoveSrc(&len, recvPointer, sizeof(len));
	if(len > 1024) {
		assert(0);
		return MSG_ERR_LIST_SIZE;
	}

	// 首先读取视频编码格式
	CSClient::CopyMoveSrc(&mInterval.videoType, recvPointer, sizeof(mInterval.videoType));
	if(mInterval.videoType.cbFormat > 1024) {
		assert(0);
		return MSG_ERR_LIST_SIZE;
	}
	mInterval.videoData = new BYTE[mInterval.videoType.cbFormat];
	CSClient::CopyMoveSrc(mInterval.videoData, recvPointer, mInterval.videoType.cbFormat);

	// 然后读取音频编码格式
	CSClient::CopyMoveSrc(&mInterval.audioType, recvPointer, sizeof(mInterval.audioType));
	if(mInterval.audioType.cbFormat > 1024) {
		assert(0);
		return MSG_ERR_LIST_SIZE;
	}
	mInterval.audioData = new BYTE[mInterval.audioType.cbFormat];
	CSClient::CopyMoveSrc(mInterval.audioData, recvPointer, mInterval.audioType.cbFormat);

	assert(len == mInterval.videoType.cbFormat+sizeof(mInterval.videoType)+mInterval.audioType.cbFormat+sizeof(mInterval.audioType));

	// 超过ADD_PROGNAME_VERSION的版本将发送节目的名字
	if(remoteVersion > comm->cfgData.ADD_PROGNAME_VERSION) {
		CSClient::CopyMoveSrc(&mInterval.pnamesize, recvPointer, sizeof(mInterval.pnamesize));
		/// 直播频道没有轮播节目名
		if(mInterval.pnamesize && mInterval.pnamesize + recvPointer-recvBuf <= msgSize) {
			mInterval.pname = new char[mInterval.pnamesize+1];
			CSClient::CopyMoveSrc(mInterval.pname, recvPointer, mInterval.pnamesize);
			mInterval.pname[mInterval.pnamesize] = 0;
		}
		else
			mInterval.pnamesize = 0; // 既然不能复制, 一定要置空!

		// 超过ADD_PROGNAME_VERSION的版本将发送节目的时间长度和频道的名字
		if(remoteVersion > comm->cfgData.ADD_PROGTIME_VERSION) {
			CSClient::CopyMoveSrc(&mInterval.progtime, recvPointer, sizeof(mInterval.progtime));
			CSClient::CopyMoveSrc(&mInterval.cnamesize, recvPointer, sizeof(mInterval.cnamesize));
			if(mInterval.cnamesize && mInterval.cnamesize + recvPointer-recvBuf <= msgSize) {
				mInterval.cname = new char[mInterval.cnamesize+1];
				CSClient::CopyMoveSrc(mInterval.cname, recvPointer, mInterval.cnamesize);
				mInterval.cname[mInterval.cnamesize] = 0;
			}
			else
				mInterval.cnamesize = 0; // 既然不能复制, 一定要置空!
		}
	}

	// add to interval array
	comm->currRes->AddMediaInterval(mInterval);

	return MSG_COMPLETE;
}

BOOL P2PClient::SendHello() {
	// 不可能没有currRes
	if(!comm->currRes)
		return FALSE;

	TCPPacket* packet;
	if(!SendBegin(packet, P2P_HELLO))
		return FALSE;

	// NP version
	CSClient::CopyMoveDes(sendPointer, &comm->cfgData.COMMUNICATOR_VERSION, sizeof(float));

	// 当前资源的Hash码
	CSClient::CopyMoveDes(sendPointer, comm->currRes->GetHashCode().data(), MD5_LEN);

	// 是否被动连接
	CSClient::CopyMoveDes(sendPointer, &isPassive, sizeof(isPassive));

	// 本机信息
	PeerInfoWithAddr thisPeer;
	memcpy(static_cast<P2PAddress*>(&thisPeer), &comm->localAddress, sizeof(comm->localAddress));
	comm->p2pMgr.GetSelfInfo(thisPeer);
	CSClient::CopyMoveDes(sendPointer, &thisPeer, sizeof(thisPeer));

	// 如果对方是CachePeer,还要发送SP的地址列表
	if(GetIsCachePeer()) {
		NormalAddress* spIPList = comm->currRes->GetSPList();
		if(spIPList) {
			UINT8 spIPListSize = comm->currRes->GetSPListSize();
			CSClient::CopyMoveDes(sendPointer, &spIPListSize, sizeof(spIPListSize));
			for(UINT8 i = 0; i < spIPListSize; ++i)
				CSClient::CopyMoveDes(sendPointer, &spIPList[i], sizeof(spIPList[i]));
		}
	}

	comm->logFile.StatusOut("Sending P2P_HELLO to %s.", comm->p2pMgr.FormatIPAddress(remotePeer));

	SendEnd(packet);

	return TRUE;
}

BOOL P2PClient::SendReport(const CorePeerInfo& peer, bool bRefresh) {
	TCPPacket* packet;
	if(!SendBegin(packet, P2P_REPORT))
		return FALSE;

	// PeerInfoWithAddr of local peer
	CSClient::CopyMoveDes(sendPointer, &peer, sizeof(peer));

	// 如果没有当前资源,则发送bRefresh=true和intervalNum=0
	if(!comm->currRes)
		bRefresh = true;

	// 是否更新全部区间列表
	CSClient::CopyMoveDes(sendPointer, &bRefresh, sizeof(bRefresh));

	// send block intervals
	// TODO: 这里存在重复代码,有待改进
	UINT8 intervalNum = comm->currRes?0xff:0;
	if(bRefresh) {
		if(comm->currRes) {
			// 如果是更新全部区间,则取得当前资源全部区间的列表,并发送
			comm->currRes->GetAllIntervals((BlockInterval*)(sendPointer+sizeof(intervalNum)), intervalNum);
		}
		CSClient::CopyMoveDes(sendPointer, &intervalNum, sizeof(intervalNum));
		sendPointer += intervalNum*sizeof(BlockInterval);
	}
	else {
		// 如果是更新变化的区间,则先后写入增加的区间和减少的区间
		if(comm->currRes)
			comm->currRes->GetDiffIntervals((BlockInterval*)(sendPointer+sizeof(intervalNum)), intervalNum, false, true);
		CSClient::CopyMoveDes(sendPointer, &intervalNum, sizeof(intervalNum));
		sendPointer += intervalNum*sizeof(BlockInterval);

		if(comm->currRes)
			comm->currRes->GetDiffIntervals((BlockInterval*)(sendPointer+sizeof(intervalNum)), intervalNum, false, false);
		CSClient::CopyMoveDes(sendPointer, &intervalNum, sizeof(intervalNum));
		sendPointer += intervalNum*sizeof(BlockInterval);
	}

	SendEnd(packet);

	//comm->logFile.StatusOut("Sending Report To %s", comm->p2pMgr.FormatIPAddress(remotePeer));
	return TRUE;
}

BOOL P2PClient::SendSPUpdate(const SPUpdate& spUpdate, UINT8 selfLayer, BYTE sum) {
	// 如果对方比自己离CP更近,那就不用发了
	if(selfLayer > remotePeer.layer)
		return TRUE;
	if(!isSameRes)
		return TRUE;

	TCPPacket* packet;
	if(!SendBegin(packet, P2P_SPUPDATE))
		return FALSE;

	// SPUpdate
	CSClient::CopyMoveDes(sendPointer, &spUpdate, sizeof(spUpdate));

	// SPUpdate sum
	CSClient::CopyMoveDes(sendPointer, &sum, sizeof(sum));

	SendEnd(packet);
	return TRUE;
}

BOOL P2PClient::SendNearPeers() {
	// 对方资源与本机相同时,才能发送有用的PeerInfo给它
	if(!isSameRes)
		return TRUE;

	TCPPacket* packet;
	if(!SendBegin(packet, P2P_NEAR_PEERS))
		return FALSE;

	list<PeerInfoWithAddr> peerList;
	comm->p2pMgr.GetPeersForNeighbour(peerList, 20, remotePeer);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -