📄 p2pclient.cpp
字号:
/*
* Openmysee
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
// P2PClient.cpp: implementation of the P2PClient class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "P2PClient.h"
#include "Communicator.h"
namespace NPLayer1 {
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
P2PClient::P2PClient()
: recvPointer(0)
, sendPointer(0)
, isSameLan(false)
{
valid = FALSE;
errStr[0] = 0;
}
P2PClient::~P2PClient() {
if(valid)
SetInvalid();
}
BOOL P2PClient::SetValid(Communicator* c, const ConnectingPeer& peer) {
comm = c;
recvOff = 0;
valid = TRUE;
m_Socket = peer.sock;
isIncoming= peer.isIncoming;
isForFree = peer.isForFree;
isPassive = peer.isPassive;
isSameRes = false;
isSameLan = peer.isSameLan;
// 到这里为止,我们完全不知道对方有哪些块
remoteInterval.Clear();
remotePush.Clear();
localPush.Clear();
sentMediaArray.Clear();
connectionBeginTime = lastRecvDataTime = lastSendDataTime = lastRecvDataHdTime = timeGetTime();
bGotFirstMsg = FALSE;
memcpy((PeerInfoWithAddr*)&remotePeer, (PeerInfoWithAddr*)&peer, sizeof(PeerInfoWithAddr));
comm->logFile.StatusOut("Connected on %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
if(GetIsCachePeer()) {
remoteVersion = comm->cfgData.COMMUNICATOR_VERSION;
remotePeer.layer = 0;
}
else {
remoteVersion = 0.0f;
remotePeer.layer = 0xff;
}
// clear transfer calculator members
ClearTransferInfo();
lastRequestStartTime = 0;
lastRequestBytes = 0;
m_reqStartTime = 0;
m_transUsedTime = 0;
if(!SendHello())
return FALSE;
CorePeerInfo thisPeer;
comm->p2pMgr.GetSelfInfo(thisPeer);
if(!SendReport(thisPeer, true))
return FALSE;
if(GetIsCachePeer()) {
// 请求重新分配所有push list
comm->p2pMgr.RedistributeAllBlocks(this);
}
return TRUE;
}
void P2PClient::SetInvalid() {
if(valid) {
valid = FALSE;
remotePush.Clear();
comm->logFile.StatusOut("Clear TCPPacket Send list...");
while (!m_sendList.empty()) {
comm->p2pMgr.ReleasePacket(m_sendList.front());
m_sendList.pop_front();
}
comm->logFile.StatusOut("Disconnected from %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
comm->p2pMgr.SafeCloseSocket(m_Socket); // no force disconnect
// 此处没有排除低版本客户端的情况,所以统计会有少许偏差
comm->p2pMgr.ConnectionClosed(isIncoming, timeGetTime()-connectionBeginTime);
}
}
BOOL P2PClient::BaseRecv() {
int ret = recv(m_Socket, recvBuf+recvOff, P2P_BUF_SIZE-recvOff, 0);
if(ret < 0) {
DWORD lastError = ::WSAGetLastError();
if (WSAEWOULDBLOCK != lastError) {
comm->logFile.StatusErr("Receiving data on TCP", lastError);
return FALSE;
}
else
return TRUE;
}
else if(0 == ret) {
comm->logFile.StatusOut("Connection has been disconnected gracefully.");
return FALSE;
}
recvOff += ret;
AddIncomingBytes(ret);
comm->p2pMgr.AddIncomingBytes(ret);
BOOL retVal = FALSE;
for(;;) {
// because multiple msgs can be received at once.
// keep call parseMsg() till !MSG_COMPLETE
MSG_STATE ms = ParseMsg();
bool bBadAddr = false;
switch(ms) {
case MSG_COMPLETE:
continue;
case MSG_UNCOMPLETE:
retVal = TRUE;
break;
case MSG_ERR_SIZE:
sprintf(errStr, "错误的消息大小!");
break;
case MSG_ERR_TYPE:
sprintf(errStr, "错误的消息类型!");
break;
case MSG_DIFF_CHNL:
sprintf(errStr, "属于不同的频道!");
bBadAddr = true;
break;
case MSG_NOMORE_CONS:
sprintf(errStr, "已经存在一个连接!");
break;
case MSG_ERR_LIST_SIZE:
sprintf(errStr, "错误的列表大小!");
break;
case MSG_SEND_ERR:
sprintf(errStr, "发送消息错误!");
break;
case MSG_SAVEDATA_ERR:
sprintf(errStr, "无法保存数据!");
break;
case MSG_UNMATCH_BLOCKID:
sprintf(errStr, "返回了并未请求的块!");
break;
case MSG_NOSUCH_RES_HERE:
sprintf(errStr, "本机没有这个资源!");
bBadAddr = true;
break;
case MSG_REMOTE_ERR:
// 错误信息已经在errStr中了
break;
case MSG_CHNL_CLOSED:
sprintf(errStr, "频道关闭!!");
// 发送没有频道关闭的消息给外界
comm->PostErrMessage(PNT_CHNL_CLOSED, 0, true);
break;
case MSG_NOSUCH_RES_SP:
sprintf(errStr, "SP没有这个资源!!");
// 发送没有资源的消息给外界
comm->PostErrMessage(PNT_NO_SUCH_RES, 0, true);
break;
case MSG_CHNL_END:
sprintf(errStr, "频道结束了!");
// 发送频道结束的消息给外界
comm->PostErrMessage(PNT_CHNL_ENDED, 0, true);
break;
case MSG_LOW_VERSION:
sprintf(errStr, "对方客户端版本过低!");
bBadAddr = true;
comm->p2pMgr.AddLowVersionConCount();
break;
default:
sprintf(errStr, "未知错误类型!");
bBadAddr = true;
break;
}
if(strlen(errStr) > 0) {
comm->logFile.StatusOut("来自%s的错误: %s", comm->p2pMgr.FormatIPAddress(remotePeer), errStr);
errStr[0] = 0;
// reject this client
comm->p2pMgr.AddBadAddr(remotePeer);
}
break;
}
return retVal;
}
MSG_STATE P2PClient::ParseMsg() {
// 如果过小,则不是正常的包
if(recvOff < sizeof(int)+sizeof(BYTE))
return MSG_UNCOMPLETE;
// 把移动指针放到数据的起始地址
recvPointer = recvBuf;
// 读取消息大小
CSClient::CopyMoveSrc(&msgSize, recvPointer, sizeof(msgSize));
// 读取消息类型
UINT8 msgType;
CSClient::CopyMoveSrc(&msgType, recvPointer, sizeof(msgType));
// msgSize是否正常
if(msgSize > P2P_BUF_SIZE || msgSize < sizeof(int)+sizeof(BYTE))
return MSG_ERR_SIZE;
// 因为P2P_RESPONSE包含数据,可能传输比较慢
// 在这里至少我们知道发送的请求的到回应了,要根据这个判断请求是否超时
if(msgType == P2P_RESPONSE) {
lastRecvDataHdTime = timeGetTime();
}
// 是否包含完成的消息
if(recvOff < msgSize)
return MSG_UNCOMPLETE;
MSG_STATE ret = MSG_COMPLETE;
switch(msgType) {
case P2P_HELLO:
ret = OnHello();
break;
case P2P_SPUPDATE:
ret = OnSPUpdate();
break;
case P2P_REPORT:
ret = OnReport();
break;
case P2P_NEAR_PEERS:
ret = OnNearPeers();
break;
case P2P_PUSHLIST:
ret = OnPushList();
break;
case P2P_RESPONSE:
ret = OnResponse();
break;
case P2P_MSG:
ret = OnMsg();
break;
case P2P_REQMEDIA:
ret = OnReqMedia();
break;
case P2P_MEDIATYPE:
ret = OnMediaType();
break;
default:
ret = MSG_ERR_TYPE;
break;
}
// copy left data to start of recvBuf
if(recvOff >= msgSize) {
memcpy(recvBuf, recvBuf+msgSize, recvOff-msgSize);
recvOff -= msgSize;
}
return ret;
}
MSG_STATE P2PClient::OnHello() {
assert(!GetIsCachePeer());
// NP version
CSClient::CopyMoveSrc(&remoteVersion, recvPointer, sizeof(remoteVersion));
if(remoteVersion < comm->cfgData.ACCEPT_VERSION) {
comm->logFile.StatusOut("Reject low version client %s %.5f.", comm->p2pMgr.FormatIPAddress(remotePeer), remoteVersion);
return MSG_LOW_VERSION;
}
// 对方需要的资源Hash码
char resHashCode[MD5_LEN+1];
resHashCode[MD5_LEN] = 0;
CSClient::CopyMoveSrc(resHashCode, recvPointer, MD5_LEN);
// 是否被动连接,因为对方可能是因为收到TS2NP_CONNECTO,被动连接过来的
// 那本连接就算做outgoing,对方会将本连接算做incoming
bool passiveConnect = false;
CSClient::CopyMoveSrc(&passiveConnect, recvPointer, sizeof(passiveConnect));
if(passiveConnect)
isIncoming = false;
// 对方Peer信息
CSClient::CopyMoveSrc(&remotePeer, recvPointer, sizeof(remotePeer));
comm->logFile.StatusOut("Got P2P_HELLO from to %s.", comm->p2pMgr.FormatIPAddress(remotePeer));
// 如果对方资源和本机当前资源相同,则是非Free的连接
if(comm->currRes && memcmp(resHashCode, comm->currRes->GetHashCode().data(), MD5_LEN) == 0) {
isForFree = false;
isSameRes = true;
}
else {
return MSG_NOSUCH_RES_HERE;
}
if(isIncoming || passiveConnect) {
// 此时isIncoming为true,说明是accept进来的连接,要在这里判断是否重复连接
// 前面的passiveConnect可能改变了isIncoming的值,
// 而passiveConnect必定是incoming的连接,所以一并在这里判断
if(!comm->p2pMgr.CheckN4One(remotePeer, isForFree, true, MAX_CONNECTION_PER_NP))
return MSG_NOMORE_CONS;
}
// 如果对方不是CachePeer, 则第一个收到的消息应该在这里
bGotFirstMsg = TRUE;
if (remotePeer.outerIP.sin_addr.s_addr == comm->localAddress.outerIP.sin_addr.s_addr) {
isSameLan = true;
}
comm->p2pMgr.ConnectionEstablished(isIncoming);
return MSG_COMPLETE;
}
MSG_STATE P2PClient::OnSPUpdate() {
if(!comm->currRes) {
assert(0);
return MSG_ERR_TYPE;
}
MSG_STATE state = MSG_COMPLETE;
UINT oldMaxBlockID = UINT_MAX;
// 记录旧的最大块ID
oldMaxBlockID = comm->currRes->GetSPUpdate().maxBlockID;
// 收到的SPUpdate
SPUpdate tmpUpdate;
CSClient::CopyMoveSrc(&tmpUpdate, recvPointer, sizeof(tmpUpdate));
comm->logFile.StatusOut("Recv SPUpdate %d->%d from %s", tmpUpdate.minBlockID, tmpUpdate.maxBlockID,
comm->p2pMgr.FormatIPAddress(remotePeer));
// 计算收到的SPUpdate的校验码
PBYTE temp = reinterpret_cast<PBYTE>(recvPointer-sizeof(tmpUpdate));
BYTE calsum = 0;
for(int i = 0; i < sizeof(tmpUpdate); ++i) {
calsum += *temp;
temp++;
}
// 如果是不带校验的SPUpdate,则不接受
if(msgSize != 5+sizeof(SPUpdate)+1) {
comm->logFile.StatusOut("Old SPUpdate!");
return MSG_COMPLETE;
}
// 读取对方发送的SPUpdate校验码
BYTE sum = 0;
CSClient::CopyMoveSrc(&sum, recvPointer, sizeof(sum));
// 比较两个SPUpdate的校验码,必须符合
if(calsum != sum) {
comm->logFile.StatusOut("Bad SPUpdate, err sum!");
return MSG_COMPLETE;
}
if(tmpUpdate.minBlockID == UINT_MAX && tmpUpdate.maxBlockID == UINT_MAX) {
if(tmpUpdate.minKeySample == 0xffffffffffffffff && tmpUpdate.maxKeySample == 0xffffffffffffffff)
state = MSG_NOSUCH_RES_SP; // SP 上没有这个资源
else if(tmpUpdate.minKeySample == 0 && tmpUpdate.maxKeySample == 0)
state = MSG_CHNL_CLOSED; // 这个频道已经关闭
else
assert(0); // 错误的消息
}
if(tmpUpdate.minBlockID == 0 && tmpUpdate.maxBlockID == 0 &&
tmpUpdate.minKeySample == 0 && tmpUpdate.maxKeySample == 0)
state = MSG_CHNL_END;
if(state == MSG_COMPLETE) {
if(GetIsCachePeer()) {
remoteInterval.Clear();
remoteInterval.AddInterval(tmpUpdate.minBlockID, tmpUpdate.maxBlockID-tmpUpdate.minBlockID);
}
// 如果收到SPUpdate的maxBlockID比本机SPUpdate的maxBlockID更大
// 则更新本机SPUpdate,并向比本机层数更高的连接广播
if(tmpUpdate.maxBlockID > oldMaxBlockID) {
comm->currRes->SetSPUpdate(tmpUpdate, sum);
// 检查PushList状况,如果只剩0块了,就要求重新分配所有连接的块
// 直播系统中连上CP的连接,不会接收到P2P_REPORT,所以只能根据spUpdate决定是否开始请求数据
if(remotePush.GetValidSize() == 0 && GetIsCachePeer())
comm->p2pMgr.RedistributeAllBlocks(this);
// 广播最新的SPUpdate
comm->p2pMgr.BroadCastSPUpdate(tmpUpdate, sum);
}
}
else {
// 广播频道的非正常状态
comm->p2pMgr.BroadCastSPUpdate(tmpUpdate, sum);
}
return state;
}
MSG_STATE P2PClient::OnReport() {
// 复制对方的信息
CSClient::CopyMoveSrc((CorePeerInfo*)&remotePeer, recvPointer, sizeof(CorePeerInfo));
// 是否更新全部区间列表
bool bRefresh;
CSClient::CopyMoveSrc(&bRefresh, recvPointer, sizeof(bRefresh));
if(bRefresh)
remoteInterval.Clear();
// 如果REFRESH,那么只有一组区间;如果不是,那么有两组区间,先后是增加和删除
for(UINT8 j = 0; j < (bRefresh?1:2); ++j) {
// read interval count
UINT8 intervalNum = 0;
CSClient::CopyMoveSrc(&intervalNum, recvPointer, sizeof(intervalNum));
assert(recvPointer - recvBuf < 1000);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -