⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 p2pclient.cpp

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 CPP
📖 第 1 页 / 共 3 页
字号:
/*
 *  Openmysee
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 */
// P2PClient.cpp: implementation of the P2PClient class.
//
//////////////////////////////////////////////////////////////////////

#include "stdafx.h"
#include "P2PClient.h"
#include "Communicator.h"

namespace NPLayer1 {

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

P2PClient::P2PClient() 
: recvPointer(0)
, sendPointer(0) 
, isSameLan(false)
{
	valid = FALSE;
	errStr[0] = 0;
}

P2PClient::~P2PClient() {
	if(valid)
		SetInvalid();
}

BOOL P2PClient::SetValid(Communicator* c, const ConnectingPeer& peer) {
	comm = c;
	recvOff = 0;
	valid = TRUE;

	m_Socket = peer.sock;
	isIncoming= peer.isIncoming;
	isForFree = peer.isForFree;
	isPassive = peer.isPassive;
	isSameRes = false;
	isSameLan = peer.isSameLan;

	// 到这里为止,我们完全不知道对方有哪些块
	remoteInterval.Clear();

	remotePush.Clear();
	localPush.Clear();
	sentMediaArray.Clear();

	connectionBeginTime = lastRecvDataTime = lastSendDataTime = lastRecvDataHdTime = timeGetTime();
	bGotFirstMsg = FALSE;

	memcpy((PeerInfoWithAddr*)&remotePeer, (PeerInfoWithAddr*)&peer, sizeof(PeerInfoWithAddr));
	comm->logFile.StatusOut("Connected on %s.", comm->p2pMgr.FormatIPAddress(remotePeer));

	if(GetIsCachePeer()) {
		remoteVersion = comm->cfgData.COMMUNICATOR_VERSION;
		remotePeer.layer = 0;
	}
	else {
		remoteVersion = 0.0f;
		remotePeer.layer = 0xff;
	}

	// clear transfer calculator members
	ClearTransferInfo();
	lastRequestStartTime = 0;
	lastRequestBytes = 0;
	m_reqStartTime = 0;
	m_transUsedTime = 0;

	if(!SendHello())
		return FALSE;
	CorePeerInfo thisPeer;
	comm->p2pMgr.GetSelfInfo(thisPeer);
	if(!SendReport(thisPeer, true))
		return FALSE;

	if(GetIsCachePeer()) {
		// 请求重新分配所有push list
		comm->p2pMgr.RedistributeAllBlocks(this);
	}
	return TRUE;
}

void P2PClient::SetInvalid() {
	if(valid) {
		valid = FALSE;

		remotePush.Clear();

		comm->logFile.StatusOut("Clear TCPPacket Send list...");
		while (!m_sendList.empty()) {
			comm->p2pMgr.ReleasePacket(m_sendList.front());
			m_sendList.pop_front();
		}

		comm->logFile.StatusOut("Disconnected from %s.", comm->p2pMgr.FormatIPAddress(remotePeer));

		comm->p2pMgr.SafeCloseSocket(m_Socket); // no force disconnect

		// 此处没有排除低版本客户端的情况,所以统计会有少许偏差
		comm->p2pMgr.ConnectionClosed(isIncoming, timeGetTime()-connectionBeginTime);
	}
}

BOOL P2PClient::BaseRecv() {
	int ret = recv(m_Socket, recvBuf+recvOff, P2P_BUF_SIZE-recvOff, 0);
	if(ret < 0) {
		DWORD lastError = ::WSAGetLastError();
		if (WSAEWOULDBLOCK != lastError) {
			comm->logFile.StatusErr("Receiving data on TCP", lastError);
			return FALSE;
		}
		else
			return TRUE;
	}
	else if(0 == ret) {
		comm->logFile.StatusOut("Connection has been disconnected gracefully.");
		return FALSE;
	}

	recvOff += ret;
	AddIncomingBytes(ret);
	comm->p2pMgr.AddIncomingBytes(ret);

	BOOL retVal = FALSE;

	for(;;) {
		// because multiple msgs can be received at once.
		// keep call parseMsg() till !MSG_COMPLETE
		MSG_STATE ms = ParseMsg(); 
		bool bBadAddr = false;

		switch(ms) {
		case MSG_COMPLETE:
			continue;
		case MSG_UNCOMPLETE:
			retVal = TRUE;
			break;
		case MSG_ERR_SIZE:
			sprintf(errStr, "错误的消息大小!");
			break;
		case MSG_ERR_TYPE: 
			sprintf(errStr, "错误的消息类型!");
			break;
		case MSG_DIFF_CHNL:
			sprintf(errStr, "属于不同的频道!");
			bBadAddr = true;
			break;
		case MSG_NOMORE_CONS:
			sprintf(errStr, "已经存在一个连接!");
			break;
		case MSG_ERR_LIST_SIZE:
			sprintf(errStr, "错误的列表大小!");
			break;
		case MSG_SEND_ERR:
			sprintf(errStr, "发送消息错误!");
			break;
		case MSG_SAVEDATA_ERR:
			sprintf(errStr, "无法保存数据!");
			break;
		case MSG_UNMATCH_BLOCKID:
			sprintf(errStr, "返回了并未请求的块!");
			break;
		case MSG_NOSUCH_RES_HERE:
			sprintf(errStr, "本机没有这个资源!");
			bBadAddr = true;
			break;
		case MSG_REMOTE_ERR:
			// 错误信息已经在errStr中了
			break;
		case MSG_CHNL_CLOSED:
			sprintf(errStr, "频道关闭!!");
			// 发送没有频道关闭的消息给外界
			comm->PostErrMessage(PNT_CHNL_CLOSED, 0, true);
			break;
		case MSG_NOSUCH_RES_SP:
			sprintf(errStr, "SP没有这个资源!!");
			// 发送没有资源的消息给外界
			comm->PostErrMessage(PNT_NO_SUCH_RES, 0, true);
			break;
		case MSG_CHNL_END:
			sprintf(errStr, "频道结束了!");
			// 发送频道结束的消息给外界
			comm->PostErrMessage(PNT_CHNL_ENDED, 0, true);
			break;
		case MSG_LOW_VERSION:
			sprintf(errStr, "对方客户端版本过低!");
			bBadAddr = true;

			comm->p2pMgr.AddLowVersionConCount();
			break;
		default:
			sprintf(errStr, "未知错误类型!");
			bBadAddr = true;
			break;
		}
		if(strlen(errStr) > 0) {
			comm->logFile.StatusOut("来自%s的错误: %s", comm->p2pMgr.FormatIPAddress(remotePeer), errStr);
			errStr[0] = 0;

			// reject this client
			comm->p2pMgr.AddBadAddr(remotePeer);
		}
		break;
	}
	return retVal;
}

MSG_STATE P2PClient::ParseMsg() {
	// 如果过小,则不是正常的包
	if(recvOff < sizeof(int)+sizeof(BYTE))
		return MSG_UNCOMPLETE;

	// 把移动指针放到数据的起始地址
	recvPointer = recvBuf;

	// 读取消息大小
	CSClient::CopyMoveSrc(&msgSize, recvPointer, sizeof(msgSize));

	// 读取消息类型
	UINT8 msgType;
	CSClient::CopyMoveSrc(&msgType, recvPointer, sizeof(msgType));

	// msgSize是否正常
	if(msgSize > P2P_BUF_SIZE || msgSize < sizeof(int)+sizeof(BYTE))
		return MSG_ERR_SIZE;

	// 因为P2P_RESPONSE包含数据,可能传输比较慢
	// 在这里至少我们知道发送的请求的到回应了,要根据这个判断请求是否超时
	if(msgType == P2P_RESPONSE) {
		lastRecvDataHdTime = timeGetTime();
	}
	
	// 是否包含完成的消息
	if(recvOff < msgSize)
		return MSG_UNCOMPLETE;

	MSG_STATE ret = MSG_COMPLETE;
	switch(msgType) {
	case P2P_HELLO:
		ret = OnHello();
		break;
	case P2P_SPUPDATE:
		ret = OnSPUpdate();
		break;
	case P2P_REPORT:
		ret = OnReport();
		break;
	case P2P_NEAR_PEERS:
		ret = OnNearPeers();
		break;
	case P2P_PUSHLIST:
		ret = OnPushList();
		break;
	case P2P_RESPONSE:
		ret = OnResponse();
		break;
	case P2P_MSG:
		ret = OnMsg();
		break;
	case P2P_REQMEDIA:
		ret = OnReqMedia();
		break;
	case P2P_MEDIATYPE:
		ret = OnMediaType();
		break;
	default:
		ret = MSG_ERR_TYPE;
		break;
	}

	// copy left data to start of recvBuf
	if(recvOff >= msgSize) {
		memcpy(recvBuf, recvBuf+msgSize, recvOff-msgSize);
		recvOff -= msgSize;
	}

	return ret;
}

MSG_STATE P2PClient::OnHello() {
	assert(!GetIsCachePeer());

	// NP version
	CSClient::CopyMoveSrc(&remoteVersion, recvPointer, sizeof(remoteVersion));
	if(remoteVersion < comm->cfgData.ACCEPT_VERSION) {
		comm->logFile.StatusOut("Reject low version client %s %.5f.", comm->p2pMgr.FormatIPAddress(remotePeer), remoteVersion);
		return MSG_LOW_VERSION;
	}

	// 对方需要的资源Hash码
	char resHashCode[MD5_LEN+1];
	resHashCode[MD5_LEN] = 0;
	CSClient::CopyMoveSrc(resHashCode, recvPointer, MD5_LEN);

	// 是否被动连接,因为对方可能是因为收到TS2NP_CONNECTO,被动连接过来的
	// 那本连接就算做outgoing,对方会将本连接算做incoming
	bool passiveConnect = false;
	CSClient::CopyMoveSrc(&passiveConnect, recvPointer, sizeof(passiveConnect));
	if(passiveConnect)
		isIncoming = false;

	// 对方Peer信息
	CSClient::CopyMoveSrc(&remotePeer, recvPointer, sizeof(remotePeer));

	comm->logFile.StatusOut("Got P2P_HELLO from to %s.", comm->p2pMgr.FormatIPAddress(remotePeer));

	// 如果对方资源和本机当前资源相同,则是非Free的连接
	if(comm->currRes && memcmp(resHashCode, comm->currRes->GetHashCode().data(), MD5_LEN) == 0) {
		isForFree = false;
		isSameRes = true;
	}
	else {
		return MSG_NOSUCH_RES_HERE;
	}

	if(isIncoming || passiveConnect) {
		// 此时isIncoming为true,说明是accept进来的连接,要在这里判断是否重复连接
		// 前面的passiveConnect可能改变了isIncoming的值, 
		// 而passiveConnect必定是incoming的连接,所以一并在这里判断
		if(!comm->p2pMgr.CheckN4One(remotePeer, isForFree, true, MAX_CONNECTION_PER_NP))
			return MSG_NOMORE_CONS;
	}

	// 如果对方不是CachePeer, 则第一个收到的消息应该在这里
	bGotFirstMsg = TRUE;

	if (remotePeer.outerIP.sin_addr.s_addr == comm->localAddress.outerIP.sin_addr.s_addr) {
		isSameLan = true;
	}

	comm->p2pMgr.ConnectionEstablished(isIncoming);

	return MSG_COMPLETE;
}

MSG_STATE P2PClient::OnSPUpdate() {
	if(!comm->currRes) {
		assert(0);
		return MSG_ERR_TYPE;
	}

	MSG_STATE state = MSG_COMPLETE;
	UINT oldMaxBlockID = UINT_MAX;

	// 记录旧的最大块ID
	oldMaxBlockID = comm->currRes->GetSPUpdate().maxBlockID;

	// 收到的SPUpdate
	SPUpdate tmpUpdate;
	CSClient::CopyMoveSrc(&tmpUpdate, recvPointer, sizeof(tmpUpdate));

	comm->logFile.StatusOut("Recv SPUpdate %d->%d from %s", tmpUpdate.minBlockID, tmpUpdate.maxBlockID, 
		comm->p2pMgr.FormatIPAddress(remotePeer));

	// 计算收到的SPUpdate的校验码
	PBYTE temp = reinterpret_cast<PBYTE>(recvPointer-sizeof(tmpUpdate));
	BYTE calsum = 0;
	for(int i = 0; i < sizeof(tmpUpdate); ++i) {
		calsum += *temp;
		temp++;
	}

	// 如果是不带校验的SPUpdate,则不接受
	if(msgSize != 5+sizeof(SPUpdate)+1) {
		comm->logFile.StatusOut("Old SPUpdate!");
		return MSG_COMPLETE;
	}

	// 读取对方发送的SPUpdate校验码
	BYTE sum = 0;
	CSClient::CopyMoveSrc(&sum, recvPointer, sizeof(sum));

	// 比较两个SPUpdate的校验码,必须符合
	if(calsum != sum) {
		comm->logFile.StatusOut("Bad SPUpdate, err sum!");
		return MSG_COMPLETE;
	}

	if(tmpUpdate.minBlockID == UINT_MAX && tmpUpdate.maxBlockID == UINT_MAX) {
		if(tmpUpdate.minKeySample == 0xffffffffffffffff && tmpUpdate.maxKeySample == 0xffffffffffffffff)
			state = MSG_NOSUCH_RES_SP;	// SP 上没有这个资源
		else if(tmpUpdate.minKeySample == 0 && tmpUpdate.maxKeySample == 0)
			state = MSG_CHNL_CLOSED;	// 这个频道已经关闭
		else 
			assert(0);	// 错误的消息
	}
	if(tmpUpdate.minBlockID == 0 && tmpUpdate.maxBlockID == 0 && 
		tmpUpdate.minKeySample == 0 && tmpUpdate.maxKeySample == 0)
		state = MSG_CHNL_END;

	if(state == MSG_COMPLETE) {
		if(GetIsCachePeer()) {
			remoteInterval.Clear();
			remoteInterval.AddInterval(tmpUpdate.minBlockID, tmpUpdate.maxBlockID-tmpUpdate.minBlockID);
		}

		// 如果收到SPUpdate的maxBlockID比本机SPUpdate的maxBlockID更大
		// 则更新本机SPUpdate,并向比本机层数更高的连接广播
		if(tmpUpdate.maxBlockID > oldMaxBlockID) {
			comm->currRes->SetSPUpdate(tmpUpdate, sum);

			// 检查PushList状况,如果只剩0块了,就要求重新分配所有连接的块
			// 直播系统中连上CP的连接,不会接收到P2P_REPORT,所以只能根据spUpdate决定是否开始请求数据
			if(remotePush.GetValidSize() == 0 && GetIsCachePeer())
				comm->p2pMgr.RedistributeAllBlocks(this);

			// 广播最新的SPUpdate
			comm->p2pMgr.BroadCastSPUpdate(tmpUpdate, sum);
		}
	}
	else {
		// 广播频道的非正常状态
		comm->p2pMgr.BroadCastSPUpdate(tmpUpdate, sum);
	}

	return state;
}

MSG_STATE P2PClient::OnReport() {	
	// 复制对方的信息
	CSClient::CopyMoveSrc((CorePeerInfo*)&remotePeer, recvPointer, sizeof(CorePeerInfo));

	// 是否更新全部区间列表
	bool bRefresh;
	CSClient::CopyMoveSrc(&bRefresh, recvPointer, sizeof(bRefresh));
	if(bRefresh)
		remoteInterval.Clear();

	// 如果REFRESH,那么只有一组区间;如果不是,那么有两组区间,先后是增加和删除
	for(UINT8 j = 0; j < (bRefresh?1:2); ++j) {
		// read interval count
		UINT8 intervalNum = 0;
		CSClient::CopyMoveSrc(&intervalNum, recvPointer, sizeof(intervalNum));
		assert(recvPointer - recvBuf < 1000);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -