📄 p2pclient.cpp
字号:
// 如果PeerList中元素个数大于0xff,则删除更多的元素
UINT8 listSize = min(0xff, peerList.size());
peerList.resize(listSize);
if(listSize == 0) {
// 释放这个Packet
comm->p2pMgr.ReleasePacket(packet);
return TRUE;
}
// 列表大小
CSClient::CopyMoveDes(sendPointer, &listSize, sizeof(listSize));
// write peerInfo one by one
for(PeerAddrIt it = peerList.begin(); it != peerList.end(); ++it) {
CSClient::CopyMoveDes(sendPointer, &*it, sizeof(PeerInfoWithAddr));
}
comm->logFile.StatusOut("Sending P2P_NEAR_PEERS to %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
SendEnd(packet);
return TRUE;
}
BOOL P2PClient::SendPushList(UINT blockID, bool bAdd) {
// 是否更新全部
bool bRefresh = (blockID == UINT_MAX);
// 如果大小为零就不必更新
if(bRefresh && remotePush.GetValidSize() == 0)
return TRUE;
TCPPacket* packet;
if(!SendBegin(packet, P2P_PUSHLIST))
return FALSE;
CSClient::CopyMoveDes(sendPointer, &bRefresh, sizeof(bRefresh));
if(bRefresh) {
// 更新整个PushList
UINT8 count = 0xff;
remotePush.CopyPushList(reinterpret_cast<UINT*>(sendPointer+sizeof(count)), count);
CSClient::CopyMoveDes(sendPointer, &count, sizeof(count));
sendPointer += count*sizeof(UINT);
comm->logFile.StatusOut("Sending P2P_PUSHLIST %s block(%d) ... to %s.",
bRefresh?"Refresh":(bAdd?"Add":"Del"), count, comm->p2pMgr.FormatIPAddress(remotePeer));
}
else {
UINT8 count = bAdd?1:0;
CSClient::CopyMoveDes(sendPointer, &count, sizeof(count));
if(bAdd)
CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));
count = bAdd?0:1;
CSClient::CopyMoveDes(sendPointer, &count, sizeof(count));
if(!bAdd)
CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));
comm->logFile.StatusOut("Sending P2P_PUSHLIST %s block %d to %s.",
bAdd?"Add":"Del",blockID, comm->p2pMgr.FormatIPAddress(remotePeer));
}
SendEnd(packet);
return TRUE;
}
BOOL P2PClient::SendResponse(UINT blockID, bool tryGetData) {
// 此块所在的区间,其媒体类型是否已经发送过了
if(!sentMediaArray.FindBlock(blockID)) {
// 没有发送过,则发送此块所在区间的媒体类型
if(!SendMediaType(blockID))
return FALSE;
}
TCPPacket* packet;
if(!SendBegin(packet, P2P_RESPONSE))
return FALSE;
// 请求的BlockID
CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));
// 检查是否存在这个Block,取出他
UINT blockSize = 0;
// 如果需要获取数据,则尝试获取数据;如果只是为了告诉对方此块不存在,则不作尝试;
if(comm->currRes && tryGetData) {
P2P_RETURN_TYPE ret = comm->currRes->GetBlock(blockID, blockSize, sendPointer+sizeof(blockSize));
if(ret < PRT_OK) {
// 一定要释放这个packet
comm->p2pMgr.ReleasePacket(packet);
return ret;
}
}
// write block size (bSize==0 means no data)
CSClient::CopyMoveDes(sendPointer, &blockSize, sizeof(blockSize));
// 将指针移动到数据的末尾
sendPointer += blockSize;
comm->logFile.StatusOut("Sending P2P_RESPONSE block(%d)(%d) to %s.",
blockID, blockSize, comm->p2pMgr.FormatIPAddress(remotePeer));
if(blockSize > 0) {
// 记录此次发送数据的时间
lastSendDataTime = timeGetTime();
}
SendEnd(packet);
if(tryGetData && blockSize == 0) {
// 如果想要发送这个块,结果发现本机没有,则尝试发送下一个块
return SendFirstPushBlock();
}
return TRUE;
}
BOOL P2PClient::SendReqMedia(UINT blockID) {
TCPPacket* packet;
if(!SendBegin(packet, P2P_REQMEDIA))
return FALSE;
CSClient::CopyMoveDes(sendPointer, &blockID, sizeof(blockID));
SendEnd(packet);
return TRUE;
}
BOOL P2PClient::SendMediaType(UINT blockID) {
if(!comm->currRes)
return FALSE;
// 查找媒体类型
MediaInterval mInterval;
if(!comm->currRes->GetMediaInterval(blockID, mInterval))
return TRUE;
TCPPacket* packet;
if(!SendBegin(packet, P2P_MEDIATYPE))
return FALSE;
// 写入区间
CSClient::CopyMoveDes(sendPointer, &mInterval.start, sizeof(mInterval.start));
CSClient::CopyMoveDes(sendPointer, &mInterval.size, sizeof(mInterval.size));
// 写入媒体数据长度
UINT len = sizeof(mInterval.videoType)+mInterval.videoType.cbFormat+sizeof(mInterval.audioType)+mInterval.audioType.cbFormat;
CSClient::CopyMoveDes(sendPointer, &len, sizeof(len));
// 写入媒体数据
CSClient::CopyMoveDes(sendPointer, &mInterval.videoType, sizeof(mInterval.videoType));
CSClient::CopyMoveDes(sendPointer, mInterval.videoData, mInterval.videoType.cbFormat);
CSClient::CopyMoveDes(sendPointer, &mInterval.audioType, sizeof(mInterval.audioType));
CSClient::CopyMoveDes(sendPointer, mInterval.audioData, mInterval.audioType.cbFormat);
// 超过ADD_PROGNAME_VERSION的版本将发送节目的名字
if(remoteVersion > comm->cfgData.ADD_PROGNAME_VERSION) {
// 写入节目的名字
CSClient::CopyMoveDes(sendPointer, &mInterval.pnamesize, sizeof(mInterval.pnamesize));
CSClient::CopyMoveDes(sendPointer, mInterval.pname, mInterval.pnamesize);
// 超过ADD_PROGTIME_VERSION的版本将发送节目的时间和频道名
if(remoteVersion > comm->cfgData.ADD_PROGTIME_VERSION) {
// 写入节目的时间长度
CSClient::CopyMoveDes(sendPointer, &mInterval.progtime, sizeof(mInterval.progtime));
// 写入频道的名字
CSClient::CopyMoveDes(sendPointer, &mInterval.cnamesize, sizeof(mInterval.cnamesize));
CSClient::CopyMoveDes(sendPointer, mInterval.cname, mInterval.cnamesize);
}
}
SendEnd(packet);
sentMediaArray.AddInterval(mInterval);
return TRUE;
}
BOOL P2PClient::SendBegin(TCPPacket*& packet, UINT8 msgType) {
packet= comm->p2pMgr.AllocatePacket();
if(!packet)
return FALSE;
// 先留着消息大小不写,到最后再写
sendPointer = packet->buf+sizeof(UINT);
CSClient::CopyMoveDes(sendPointer, &msgType, sizeof(msgType));
return TRUE;
}
void P2PClient::SendEnd(TCPPacket*& packet) {
// 消息的大小就是移动的指针减去初始的指针
packet->size = sendPointer-packet->buf;
packet->sent = 0;
memcpy(packet->buf, &packet->size, sizeof(packet->size));
m_sendList.push_back(packet);
}
// 添加一个要Push的块
bool P2PClient::AddPushBlock(UINT blockID, bool bRefreshing) {
if(!remotePush.AddBlock(blockID, timeGetTime()))
return false;
if(remotePush.GetValidSize() == 1) {
// pushlist大小从0到1,说明新的下载即将开始
lastRecvDataHdTime = timeGetTime();
// pushlist大小从0到1,记录此时的时间,用于统计实际传输速度
m_reqStartTime = timeGetTime();
//comm->logFile.StatusOut("Request Start Time %d", m_reqStartTime);
}
if(!bRefreshing) {
// 如果不是正在更新整个PushList,就立即发送消息
if(!SendPushList(blockID, true)) {
if(!remotePush.DelBlock(blockID))
assert(0);
return false;
}
}
return true;
}
// 查找一个PushList中的块
bool P2PClient::FindPushBlock(const UINT blockID) const {
return remotePush.FindBlock(blockID);
}
// 删除一个要Push的块
bool P2PClient::DelPushBlock(UINT blockID) {
// 从PushList中查找并删除此块
if(!remotePush.DelBlock(blockID))
return false;
// 发送删除一个块的消息
return SendPushList(blockID, false) == TRUE;
}
// Push List是否已满
bool P2PClient::IsPushListFull() {
return (remotePush.GetValidSize() == remotePush.GetTotalSize());
}
// 取得Pushlist大小
UINT8 P2PClient::GetTotalPushSize() {
return remotePush.GetTotalSize();
}
// 清空PushList
void P2PClient::ClearPush() {
remotePush.Clear();
}
// 发送整个PushList,因为之前重新分配了所有连接的块
void P2PClient::SendPush() {
if(!SendPushList(UINT_MAX, false))
assert(0);
}
// 重新加载PushList,只有在普通的下载时才会被调用
void P2PClient::ReloadPushList() {
if(remotePush.GetValidSize() > 0)
return;
UINT array[P2P_HIGH_PUSH];
std::fill(array, array+P2P_HIGH_PUSH, UINT_MAX);
UINT count = 0;
// 计算此连接一次push的大小
UINT pushNum = P2P_MID_PUSH; // default
// 注意两者单位不同,前者是BPS, 后者是KBPS
if(avgDownSpeed/1024/comm->currRes->GetBitRate() < 0.1667f)
pushNum = P2P_LOW_PUSH;
else if(avgDownSpeed/1024/comm->currRes->GetBitRate() > 0.5f)
pushNum = P2P_HIGH_PUSH;
//else
// the default value
// 1. 获取可以下载的区间列表
IntervalArray downloadable;
comm->currRes->GetDownloadableArray(remoteInterval, downloadable);
if(downloadable.IsEmpty()) {
return;
}
// 2. 遍历此区间列表,根据条件排除不可以下载的块,直到遍历完毕或者找到pushNum个Block
BlockInterval temp;
while(!downloadable.IsEmpty() && count < pushNum) {
downloadable.PopFront(temp);
if(temp.size == 0)
break;
for(UINT i = temp.start; i < temp.start+temp.size && count < pushNum; ++i) {
// 如果此块有别人正在下载,则去除此块
if(comm->p2pMgr.FindInAllPushList(i, this))
continue;
// 如果有其他NP有此块,那么就不要从CP上下载了, 前提是缓冲进度100%而且有此块的NP下载速度大于5.0KBPS
if(GetIsCachePeer() &&
comm->currRes->GetBufferPercent() == 100 &&
comm->p2pMgr.FindInAllRemoteInterval(i, this, false, 5.0f) > 0)
continue;
// 如果是外网节点,且内网节点上有,则不从这下载
if (GetIsSameLan() &&
comm->p2pMgr.FindInAllLanRemoteInterval(i, this, 5.0f) > 0)
continue;
// 记录一个可以下载的块
array[count++] = i;
}
}
if(count) {
// 根据缓冲情况决定是否要随机下载
if(comm->currRes->GetBufferPercent() == 100) {
random_shuffle(array, array+count);
comm->logFile.StatusOut("Do by random %d.", pushNum);
}
else
comm->logFile.StatusOut("Do by order %d.", pushNum);
// 添加到pushlist中,并发送
UINT addNum = 0;
for(UINT i = 0; i < count && addNum < pushNum; ++i) {
if(array[i] != UINT_MAX) {
if(!AddPushBlock(array[i], true))
break;
addNum++;
}
}
if(addNum)
SendPush();
}
}
// 发送push列表中的第一个块
BOOL P2PClient::SendFirstPushBlock() {
// 是否有需要发送的块,如果有,取出第一个(也是最小的一个)
UINT blockID;
localPush.GetFirstBlock(blockID);
if(blockID == UINT_MAX)
return TRUE;
// 检查当前发送列表中是否已经有本连接的块,
// 1. 如果没有,那么发送;
// 2. 如果有,则暂时不发送
if(!CheckSendingBlock())
return SendResponse(blockID, true);
localPush.AddBlock(blockID);
return TRUE;
}
BOOL P2PClient::IsIdleTooLong() {
DWORD tNow = timeGetTime();
// 对于Listener, 是上次发送数据的时间;对于Connector,是上次收到数据的时间
//int transferIdle = isIncoming?tNow-lastSendDataTime:tNow-lastRecvDataTime;
int transferIdle = min(tNow-lastSendDataTime, tNow-lastRecvDataTime);
// 明明push list非空,但是却没有收到数据头部的时间
int responseIdle = tNow-lastRecvDataHdTime;
GenerateTransferInfo(TRUE);
TransferInfo tiTemp;
GetTransferInfo(tiTemp);
comm->logFile.StatusOut("%s: transfer idle %d. response idle %d. \r\n\
\tcurr down/up:%.2f/%.2f, avg down/up:%.2f/%.2f, total down/up:%.2f/%.2fMB. layer: %d. bandwidth: %.4fKBPS %s.",
comm->p2pMgr.FormatIPAddress(remotePeer),
transferIdle, responseIdle,
tiTemp.currDownSpeed/1024, tiTemp.currUpSpeed/1024,
tiTemp.avgDownSpeed/1024, tiTemp.avgUpSpeed/1024,
tiTemp.totalDownBytes/1024.f/1024.f, tiTemp.totalUpBytes/1024.f/1024.f,
GetLayer(), GetBandWidth(), isIncoming?"in":"out");
BOOL kill = FALSE;
if(bGotFirstMsg) {
if(isIncoming)
kill = (transferIdle > MAX_INCOMING_SENDDATA_IDLE);
else
kill = (transferIdle > MAX_OUTGOING_RECVDATA_IDLE);
// TOOD: 如果pushlist非空,但是有一段时间没有数据的头部了,那么就杀掉此连接
// 注意此timeout时间比transfer timeout要短一些
kill = kill || (remotePush.GetValidSize() != 0 && responseIdle > MAX_RESPONSE_IDLE);
}
else {
int startIdle = transferIdle;
if(GetIsCachePeer()) {
kill = (startIdle > MAX_CP_FIRST_MSG_IDLE);
}
else
kill = (startIdle > MAX_NP_FIRST_MSG_IDLE);
}
if(kill) {
comm->logFile.StatusOut("Kill Idle %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
}
return kill;
}
// 检查当前发送列表中是否已经有某连接的块,
// 1. 如果有,返回TRUE
// 2. 如果没有,那么返回FALSE;
BOOL P2PClient::CheckSendingBlock() {
TCPPacket* packet = NULL;
UINT temp;
for(TCPPackIt it = m_sendList.begin(); it != m_sendList.end(); ++it) {
packet = *it;
// 如果消息类型不是P2P_RESPONSE,接着找下一个
if(packet->buf[4] != P2P_RESPONSE)
continue;
// 如果blockSize为0,接着找下一个
memcpy(&temp, packet->GetBlockSize(), sizeof(temp));
if(temp == 0)
continue;
// 已经有准备发送的块
return TRUE;
}
// 没有找到,说明可以发送blockID
return FALSE;
}
// 从发送列表中查找一个块,如果此块准备发送(packet->send == 0),则删除并返回TRUE;
// 如果此块已经开始发送,则直接返回TRUE。如未找到,则返回FALSE
// 如果blockID是UINT_MAX,则删除任意一个准备发送的块
BOOL P2PClient::DeleteSendingBlock(UINT blockID) {
TCPPacket* packet = NULL;
UINT temp;
UINT index = 0;
for(TCPPackIt it = m_sendList.begin(); it != m_sendList.end(); ++it, index++) {
packet = *it;
// 如果消息类型不是P2P_RESPONSE,接着找下一个
if(packet->GetMsgType() != P2P_RESPONSE)
continue;
// 如果不是目标块,则接着找下一个
// 如果blockID是UINT_MAX,则删除所有准备发送的块
memcpy(&temp, packet->buf+5, sizeof(temp));
if(temp != blockID && blockID != UINT_MAX)
continue;
// 如果已经开始发送,就不能打断,返回TRUE
if(packet->sent > 0)
return TRUE;
// 如果是尚未开始发送的块,则删除,返回TRUE
comm->p2pMgr.ReleasePacket(packet);
m_sendList.erase(it);
return TRUE;
}
// 没有找到
return FALSE;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -