📄 p2pclient.cpp
字号:
// read intervals
for(UINT8 i = 0; i < intervalNum; ++i) {
BlockInterval temp;
CSClient::CopyMoveSrc(&temp, recvPointer, sizeof(temp));
if(j == 0)
remoteInterval.AddInterval(temp.start, temp.size);
else
remoteInterval.DelInterval(temp.start, temp.size);
}
// 如果对方新增了Block
if(j == 0 && intervalNum > 0) {
// 检查PushList状况,如果只剩0块了,就要求重新分配所有连接的块
if(remotePush.GetValidSize() == 0 && comm->currRes)
comm->p2pMgr.RedistributeAllBlocks(this);
}
}
//comm->logFile.StatusOut("Recv P2P_REPORT from %s", comm->p2pMgr.FormatIPAddress(remotePeer));
return MSG_COMPLETE;
}
MSG_STATE P2PClient::OnNearPeers() {
// get size of peer list and move the pointer
UINT8 listSize;
CSClient::CopyMoveSrc(&listSize, recvPointer, sizeof(listSize));
// 检查listSize是否正常
if(static_cast<int>(listSize*sizeof(PeerInfoWithAddr)) > recvBuf+P2P_BUF_SIZE-recvPointer)
return MSG_ERR_LIST_SIZE;
// read peer one by one, and add to known peer list
for(UINT8 i = 0; i < listSize; ++i) {
PeerInfoWithAddr peer;
CSClient::CopyMoveSrc(&peer, recvPointer, sizeof(peer));
peer.isCachePeer = false;
comm->p2pMgr.AddPeerInfo(peer);
}
comm->logFile.StatusOut("Recv P2P_NEAR_PEERS(%d NP) from %s.", listSize, comm->p2pMgr.FormatIPAddress(remotePeer));
return MSG_COMPLETE;
}
MSG_STATE P2PClient::OnPushList() {
// 当没有当前资源时,不给对方发送数据
if(!comm->currRes)
return MSG_SEND_ERR;
// 是否更新整个PushList
bool bRefresh;
CSClient::CopyMoveSrc(&bRefresh, recvPointer, sizeof(bRefresh));
if(bRefresh) {
// 清空所有准备发送的块(已经开始发送的块,不用清空)
while(DeleteSendingBlock(UINT_MAX));
localPush.Clear();
}
UINT blockID = UINT_MAX;
if(bRefresh) {
UINT8 count;
CSClient::CopyMoveSrc(&count, recvPointer, sizeof(count));
assert(count < 30);
for(UINT8 i = 0; i < count; ++i) {
CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
// 查找本机有没有这个块,如果没有,立即告知对方;
if(!comm->currRes->FindBlock(blockID)) {
if(!SendResponse(blockID, false))
return MSG_SEND_ERR;
}
else
localPush.AddBlock(blockID);
}
if(count)
comm->logFile.StatusOut("Got request refresh(%d...) from %s.", blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
}
else {
UINT8 count;
CSClient::CopyMoveSrc(&count, recvPointer, sizeof(count));
assert(count < 2);
for(UINT8 i = 0; i < count; ++i) {
CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
// 查找本机有没有这个块,如果没有,立即告知对方;
if(!comm->currRes->FindBlock(blockID)) {
if(!SendResponse(blockID, false))
return MSG_SEND_ERR;
}
else
localPush.AddBlock(blockID);
}
if(count)
comm->logFile.StatusOut("Got request add(%d) from %s.", blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
CSClient::CopyMoveSrc(&count, recvPointer, sizeof(count));
assert(count < 2);
for(UINT8 i = 0; i < count; ++i) {
CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
// 如果在localPush中查找并删除此块
if(!localPush.DelBlock(blockID)) {
// 如果在localPush中没有找到,则说明此块已经在发送列表中了,甚至可能已经被发送了
// 尝试从发送列表中查找并删除此块
if(!DeleteSendingBlock(blockID)) {
comm->logFile.StatusOut("Ahhhhh..., the block has already been sent!");
}
}
}
if(count)
comm->logFile.StatusOut("Got request del(%d) from %s.", blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
}
// 发送push列表中的第一个块
if(!SendFirstPushBlock())
return MSG_SEND_ERR;
return MSG_COMPLETE;
}
MSG_STATE P2PClient::OnResponse() {
// 如果对方是CachePeer, 则第一个收到的消息应该在这里
if(GetIsCachePeer())
bGotFirstMsg = TRUE;
MSG_STATE ret = MSG_COMPLETE;
// 回应的BlockID和大小
UINT blockID;
CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
UINT blockSize;
CSClient::CopyMoveSrc(&blockSize, recvPointer, sizeof(blockSize));
// 检查Block大小是否正常
if(static_cast<int>(blockSize) > recvBuf+P2P_BUF_SIZE-recvPointer) {
ret = MSG_ERR_SIZE;
assert(0);
}
// 检查对方返回的Block是不是在Push List之中,如果在,则删除之;
// 如果不在,则检查是否在其他连接的Push List之中,如果在,则删除之
DWORD reqTime;
if(!remotePush.DelBlock(blockID, &reqTime))
comm->p2pMgr.ReclaimBlocks(this, blockID);
if(ret == MSG_COMPLETE) {
if(blockSize > 0) {
// 记录此次得到数据的时间
lastRecvDataTime = timeGetTime();
// 记录本次传输所用的时间
m_transUsedTime += timeGetTime()-m_reqStartTime;
//assert(lastRecvDataTime-m_reqStartTime < 25000);
// 确实已经下载了,虽然可能发生存储错误
comm->p2pMgr.AddBlockDataDown(blockSize);
if(GetIsCachePeer())
comm->p2pMgr.AddCPData(blockSize);
// 存储此块
P2P_RETURN_TYPE bSuccess = comm->currRes->PutBlock(blockID, blockSize, reinterpret_cast<PBYTE>(recvPointer));
recvPointer += blockSize;
if(bSuccess < PRT_OK) {
if(bSuccess == PRT_BUFFER_FULL) {
comm->logFile.StatusOut("缓冲区已满");
Sleep(300);
}
else if(bSuccess == PRT_BAD_BLOCK) {
if(!remotePeer.isCachePeer) {
// 这个块是错误的,不再从这个NP下载此块
remoteInterval.DelInterval(blockID, 1);
}
}
else {
comm->logFile.StatusErr("写入磁盘错误", GetLastError());
// 发送写入磁盘错误消息
comm->PostErrMessage(PNT_DISK_ERR, 0, true);
ret = MSG_SAVEDATA_ERR;
assert(0);
}
}
char* temp = "";
#ifdef _DEBUG
MD5 md5(reinterpret_cast<BYTE*>(recvPointer-blockSize), blockSize);
temp = md5.hex_digest();
#endif
comm->logFile.StatusOut("Got %sblock %d(%d)(%s)(used %dms) from %s.",
(bSuccess==PRT_BAD_BLOCK?"bad ":""), blockID, blockSize, temp, lastRecvDataTime-m_reqStartTime,
comm->p2pMgr.FormatIPAddress(remotePeer));
#ifdef _DEBUG
delete [] temp;
// 记录下次传输开始的时间
m_reqStartTime = timeGetTime();
#endif
}
else {
comm->logFile.StatusOut("Got block %d(%d) from %s.",
blockID, blockSize, comm->p2pMgr.FormatIPAddress(remotePeer));
}
// 既然得到了数据,登录不上TS的问题就算了:)
comm->csClient.ResetLoginFail();
}
if(ret == MSG_COMPLETE) {
if(blockSize == 0) {
// 对方没有这个块,从区间表中删除,并将此块交给其他连接
remoteInterval.DelInterval(blockID, 1);
}
// 检查PushList状况,如果只剩0块了,就要求重新分配所有连接的块
if(remotePush.GetValidSize() == 0)
comm->p2pMgr.RedistributeAllBlocks(this);
}
return ret;
}
MSG_STATE P2PClient::OnMsg() {
//assert(GetIsCachePeer());
// 错误代码
UINT16 errCode;
CSClient::CopyMoveSrc(&errCode, recvPointer, sizeof(errCode));
// 是否需要断开连接
bool shouldDisconnect;
CSClient::CopyMoveSrc(&shouldDisconnect, recvPointer, sizeof(shouldDisconnect));
// 根据错误代码处理
switch(errCode) {
case ERR_PROTOCOL_FORMAT:
sprintf(errStr, "协议错误");
break;
case ERR_AUTHORIZATION:
sprintf(errStr, "验证错误");
break;
case ERR_INTERNAL:
sprintf(errStr, "未知错误");
break;
case ERR_CONNECTION_FULL:
sprintf(errStr, "对方连接已满");
break;
default:
shouldDisconnect = true;
}
if(shouldDisconnect) {
// 停止连接这个地址
comm->p2pMgr.AddBadAddr(remotePeer);
return MSG_REMOTE_ERR;
}
return MSG_COMPLETE;
}
MSG_STATE P2PClient::OnReqMedia() {
UINT blockID = 0;
CSClient::CopyMoveSrc(&blockID, recvPointer, sizeof(blockID));
if(!SendMediaType(blockID))
return MSG_SEND_ERR;
return MSG_COMPLETE;
}
MSG_STATE P2PClient::OnMediaType() {
assert(comm->currRes);
if(!comm->currRes)
return MSG_DIFF_CHNL;
// 一些较早版本的客户端会读取gtv文件中的媒体类型,然后发送很大的mediainterval,这种消息不予接受
if(remoteVersion < comm->cfgData.ADD_PROGNAME_VERSION) {
return MSG_COMPLETE;
}
MediaInterval mInterval;
// 区间
CSClient::CopyMoveSrc(&mInterval.start, recvPointer, sizeof(mInterval.start));
CSClient::CopyMoveSrc(&mInterval.size, recvPointer, sizeof(mInterval.size));
UINT len = 0;
CSClient::CopyMoveSrc(&len, recvPointer, sizeof(len));
if(len > 1024) {
assert(0);
return MSG_ERR_LIST_SIZE;
}
// 首先读取视频编码格式
CSClient::CopyMoveSrc(&mInterval.videoType, recvPointer, sizeof(mInterval.videoType));
if(mInterval.videoType.cbFormat > 1024) {
assert(0);
return MSG_ERR_LIST_SIZE;
}
mInterval.videoData = new BYTE[mInterval.videoType.cbFormat];
CSClient::CopyMoveSrc(mInterval.videoData, recvPointer, mInterval.videoType.cbFormat);
// 然后读取音频编码格式
CSClient::CopyMoveSrc(&mInterval.audioType, recvPointer, sizeof(mInterval.audioType));
if(mInterval.audioType.cbFormat > 1024) {
assert(0);
return MSG_ERR_LIST_SIZE;
}
mInterval.audioData = new BYTE[mInterval.audioType.cbFormat];
CSClient::CopyMoveSrc(mInterval.audioData, recvPointer, mInterval.audioType.cbFormat);
assert(len == mInterval.videoType.cbFormat+sizeof(mInterval.videoType)+mInterval.audioType.cbFormat+sizeof(mInterval.audioType));
// 超过ADD_PROGNAME_VERSION的版本将发送节目的名字
if(remoteVersion > comm->cfgData.ADD_PROGNAME_VERSION) {
CSClient::CopyMoveSrc(&mInterval.pnamesize, recvPointer, sizeof(mInterval.pnamesize));
/// 直播频道没有轮播节目名
if(mInterval.pnamesize && mInterval.pnamesize + recvPointer-recvBuf <= msgSize) {
mInterval.pname = new char[mInterval.pnamesize+1];
CSClient::CopyMoveSrc(mInterval.pname, recvPointer, mInterval.pnamesize);
mInterval.pname[mInterval.pnamesize] = 0;
}
else
mInterval.pnamesize = 0; // 既然不能复制, 一定要置空!
// 超过ADD_PROGNAME_VERSION的版本将发送节目的时间长度和频道的名字
if(remoteVersion > comm->cfgData.ADD_PROGTIME_VERSION) {
CSClient::CopyMoveSrc(&mInterval.progtime, recvPointer, sizeof(mInterval.progtime));
CSClient::CopyMoveSrc(&mInterval.cnamesize, recvPointer, sizeof(mInterval.cnamesize));
if(mInterval.cnamesize && mInterval.cnamesize + recvPointer-recvBuf <= msgSize) {
mInterval.cname = new char[mInterval.cnamesize+1];
CSClient::CopyMoveSrc(mInterval.cname, recvPointer, mInterval.cnamesize);
mInterval.cname[mInterval.cnamesize] = 0;
}
else
mInterval.cnamesize = 0; // 既然不能复制, 一定要置空!
}
}
// add to interval array
comm->currRes->AddMediaInterval(mInterval);
return MSG_COMPLETE;
}
BOOL P2PClient::SendHello() {
// 不可能没有currRes
if(!comm->currRes)
return FALSE;
TCPPacket* packet;
if(!SendBegin(packet, P2P_HELLO))
return FALSE;
// NP version
CSClient::CopyMoveDes(sendPointer, &comm->cfgData.COMMUNICATOR_VERSION, sizeof(float));
// 当前资源的Hash码
CSClient::CopyMoveDes(sendPointer, comm->currRes->GetHashCode().data(), MD5_LEN);
// 是否被动连接
CSClient::CopyMoveDes(sendPointer, &isPassive, sizeof(isPassive));
// 本机信息
PeerInfoWithAddr thisPeer;
memcpy(static_cast<P2PAddress*>(&thisPeer), &comm->localAddress, sizeof(comm->localAddress));
comm->p2pMgr.GetSelfInfo(thisPeer);
CSClient::CopyMoveDes(sendPointer, &thisPeer, sizeof(thisPeer));
// 如果对方是CachePeer,还要发送SP的地址列表
if(GetIsCachePeer()) {
NormalAddress* spIPList = comm->currRes->GetSPList();
if(spIPList) {
UINT8 spIPListSize = comm->currRes->GetSPListSize();
CSClient::CopyMoveDes(sendPointer, &spIPListSize, sizeof(spIPListSize));
for(UINT8 i = 0; i < spIPListSize; ++i)
CSClient::CopyMoveDes(sendPointer, &spIPList[i], sizeof(spIPList[i]));
}
}
comm->logFile.StatusOut("Sending P2P_HELLO to %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
SendEnd(packet);
return TRUE;
}
BOOL P2PClient::SendReport(const CorePeerInfo& peer, bool bRefresh) {
TCPPacket* packet;
if(!SendBegin(packet, P2P_REPORT))
return FALSE;
// PeerInfoWithAddr of local peer
CSClient::CopyMoveDes(sendPointer, &peer, sizeof(peer));
// 如果没有当前资源,则发送bRefresh=true和intervalNum=0
if(!comm->currRes)
bRefresh = true;
// 是否更新全部区间列表
CSClient::CopyMoveDes(sendPointer, &bRefresh, sizeof(bRefresh));
// send block intervals
// TODO: 这里存在重复代码,有待改进
UINT8 intervalNum = comm->currRes?0xff:0;
if(bRefresh) {
if(comm->currRes) {
// 如果是更新全部区间,则取得当前资源全部区间的列表,并发送
comm->currRes->GetAllIntervals((BlockInterval*)(sendPointer+sizeof(intervalNum)), intervalNum);
}
CSClient::CopyMoveDes(sendPointer, &intervalNum, sizeof(intervalNum));
sendPointer += intervalNum*sizeof(BlockInterval);
}
else {
// 如果是更新变化的区间,则先后写入增加的区间和减少的区间
if(comm->currRes)
comm->currRes->GetDiffIntervals((BlockInterval*)(sendPointer+sizeof(intervalNum)), intervalNum, false, true);
CSClient::CopyMoveDes(sendPointer, &intervalNum, sizeof(intervalNum));
sendPointer += intervalNum*sizeof(BlockInterval);
if(comm->currRes)
comm->currRes->GetDiffIntervals((BlockInterval*)(sendPointer+sizeof(intervalNum)), intervalNum, false, false);
CSClient::CopyMoveDes(sendPointer, &intervalNum, sizeof(intervalNum));
sendPointer += intervalNum*sizeof(BlockInterval);
}
SendEnd(packet);
//comm->logFile.StatusOut("Sending Report To %s", comm->p2pMgr.FormatIPAddress(remotePeer));
return TRUE;
}
BOOL P2PClient::SendSPUpdate(const SPUpdate& spUpdate, UINT8 selfLayer, BYTE sum) {
// 如果对方比自己离CP更近,那就不用发了
if(selfLayer > remotePeer.layer)
return TRUE;
if(!isSameRes)
return TRUE;
TCPPacket* packet;
if(!SendBegin(packet, P2P_SPUPDATE))
return FALSE;
// SPUpdate
CSClient::CopyMoveDes(sendPointer, &spUpdate, sizeof(spUpdate));
// SPUpdate sum
CSClient::CopyMoveDes(sendPointer, &sum, sizeof(sum));
SendEnd(packet);
return TRUE;
}
BOOL P2PClient::SendNearPeers() {
// 对方资源与本机相同时,才能发送有用的PeerInfo给它
if(!isSameRes)
return TRUE;
TCPPacket* packet;
if(!SendBegin(packet, P2P_NEAR_PEERS))
return FALSE;
list<PeerInfoWithAddr> peerList;
comm->p2pMgr.GetPeersForNeighbour(peerList, 20, remotePeer);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -