📄 p2pmgr.cpp
字号:
/*
* Openmysee
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
#include "StdAfx.h"
#include "P2PMgr.h"
#include "Communicator.h"
namespace NPLayer1 {
P2PMgr::P2PMgr(Communicator* c) : comm(c), m_freeList(8, 1) {
m_Socket = INVALID_SOCKET;
isRunning = FALSE;
mgrThread = NULL;
m_totalBlockDataUp = m_totalBlockDataDown = 0;
m_totalBytesFromCP = 0;
m_connectFailCount = m_lowversionCount = 0;
m_incomingCount = m_outgoingCount = 0;
m_incomingElapsedTime = m_outgoingElapsedTime = 0;
m_bBlockIntervalHasChanged = true;
}
P2PMgr::~P2PMgr() {
Uninit();
comm->logFile.StatusOut("P2PMgr: exited...");
}
P2P_RETURN_TYPE P2PMgr::Init() {
if(!isRunning) {
isRunning = TRUE;
// 初始化足够的P2PClient
m_clients.resize(comm->cfgData.GetSum());
P2P_RETURN_TYPE ret = Binding();
if(ret < PRT_OK)
return ret;
comm->logFile.StatusOut("P2PMgr Thread %d Begin, this=%p, comm=%p, comm->p2pMgr=%p",
::GetCurrentThreadId(), this, comm, &comm->p2pMgr);
DWORD threadID;
mgrThread = CreateThread(
NULL, 0, (LPTHREAD_START_ROUTINE)(P2PMgr::RunManager), this, 0, &threadID);
if(mgrThread == NULL)
return PRT_SYS;
// 设置较低的优先级
SetThreadPriority(mgrThread, THREAD_PRIORITY_BELOW_NORMAL);
}
return PRT_OK;
}
void P2PMgr::Uninit() {
if(isRunning) {
isRunning = FALSE;
SafeCloseSocket(m_Socket);
if(mgrThread) {
// 如果一段时间内不能正常停止,就强行终止线程
comm->logFile.StatusOut("P2PMgr: Wait for Thread exit...");
DWORD ret = WaitForSingleObject(mgrThread, 5000);
if(ret == WAIT_TIMEOUT) {
comm->logFile.StatusOut("P2PMgr: Wait for Thread exit...Failed! Terminate it!");
TerminateThread(mgrThread, 0);
}
CloseHandle(mgrThread);
mgrThread = NULL;
}
comm->logFile.StatusOut("P2PMgr: Clear P2PClient list...");
for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
(*it).SetInvalid();
}
m_clients.clear();
comm->logFile.StatusOut("P2PMgr: Clear other lists...");
// 清空正在连接的列表
m_connecting.clear();
// 清空已知Peer的列表
m_peers.clear();
// 清空无法连接的地址列表
m_badAddr.clear();
// 清空需要连接的列表
m_connectto.clear();
}
}
P2P_RETURN_TYPE P2PMgr::Binding() {
// Microsoft Knowledge Base: WSA_FLAG_OVERLAPPED Is Needed for Non-Blocking Sockets
// http://support.microsoft.com/default.aspx?scid=kb;EN-US;179942
m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if(m_Socket == INVALID_SOCKET) {
comm->logFile.StatusErr("Creating listening socket", WSAGetLastError());
return PRT_NET;
}
// SO_EXCLUSIVEADDRUSE is only supported in 2000/XP/2003 and higher OS.
if(comm->osvi.dwMajorVersion > 5) {
//独占端口
BOOL bExAddrUse = TRUE;
if(setsockopt(m_Socket, SOL_SOCKET, ((int)(~SO_REUSEADDR)), (const char*)&bExAddrUse, sizeof(BOOL)) == SOCKET_ERROR) {
comm->logFile.StatusErr("Setting TCP socket as SO_EXCLUSIVEADDRUSE", WSAGetLastError());
return PRT_NET;
}
}
// 不使用Nagle算法
BOOL bNoDelay = TRUE;
if(setsockopt(m_Socket, SOL_SOCKET, TCP_NODELAY, (const char*)&bNoDelay, sizeof(bNoDelay)) == SOCKET_ERROR) {
comm->logFile.StatusErr("Setting TCP socket as TCP_NODELAY", WSAGetLastError());
return PRT_NET;
}
// Set this socket as a Non-blocking socket.
ULONG flag = 1;
if (ioctlsocket(m_Socket, FIONBIO, &flag) == -1) {
comm->logFile.StatusErr("Setting listening socket as non-blocking", WSAGetLastError());
return PRT_NET;
}
// Associate the local address with m_Socket.
sockaddr_in addr;
memset(&addr, 0, sizeof(sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
USHORT port = 0;
for(int i = 0; i < 1000; ++i) {
switch(i) {
case 0: port = 80; break;
case 1: port = 21; break;
case 2: port = 22; break;
case 3: port = 23; break;
case 4: port = 20; break;
default: port = static_cast<USHORT>(rand(&comm->ctx))%10000 + 45000; break;
}
addr.sin_port = htons(port);
if(bind(m_Socket, (sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR)
break;
comm->logFile.StatusOut("Bind TCP socket at port %d. Failed", port);
}
if(i == 1000)
return PRT_INIT_BIND;
// Establish a socket to listen for incoming connections.
if(listen(m_Socket, 3) == SOCKET_ERROR) {
comm->logFile.StatusErr("Listening socket", WSAGetLastError());
return PRT_NET;
}
comm->localAddress.subnetIP.sin_port = addr.sin_port;
comm->logFile.StatusOut("Binding TCP socket at port %d.", port);
return PRT_OK;
}
void __stdcall P2PMgr::RunManager(P2PMgr* mgr) {
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 50000;
fd_set read_set, write_set;
sockaddr_in tmpAddr;
memset(&tmpAddr, 0, sizeof(tmpAddr));
int addrlen = sizeof(tmpAddr);
bool nullLastTime = true;
bool bFirstTime = true;
ConnectingPeer incomingPeer;
DWORD lastManageTime=0, lastCSReportTime=0,
lastP2PReportTime=0, lastP2PNearPeerTime=0,
lastTryConnectTime=0, lastClearBadAddress=0,
lastCheckPushListTime=0, lastCSReport2Time=0,
lastBroadcastInfoTime=0, currTime=0;
// 每隔一段比较长的时间,比如20次report之后,就发送一次更新全部区间列表的消息
UINT csReportCount=0, p2pReportCount=0;
// 设置这个时间意味着,第一次While循环时,csClient.SendLogin不会被执行
// 为什么要这样做,因为SendLogin已经在Communicator初始化之际被执行了;
// 不必在这么短的时间内连续发送两个Login,徒然增加TS负担而已
lastCSReportTime = timeGetTime();
mgr->comm->logFile.StatusOut("P2PMgr Thread %d Started, mgr=%p, mgr->comm=%p, mgr->comm->p2pMgr=%p",
::GetCurrentThreadId(), mgr, mgr->comm, &mgr->comm->p2pMgr);
while(mgr->isRunning) {
CriticalSection::Owner lock(mgr->comm->mainThreadLock);
FD_ZERO(&read_set);
FD_SET(mgr->m_Socket, &read_set);
FD_SET(mgr->comm->csClient.GetSocket(), &read_set);
if (mgr->comm->localPeerFinder.IsRunning()) {
FD_SET(mgr->comm->localPeerFinder.GetSocket(), &read_set);
}
for(ClientIt it = mgr->m_clients.begin(); it != mgr->m_clients.end(); ++it) {
if(!it->IsValid())
continue;
FD_SET(it->GetSocket(), &read_set);
}
FD_ZERO(&write_set);
for(CPeerIt itt = mgr->m_connecting.begin(); itt != mgr->m_connecting.end(); ++itt)
FD_SET(itt->sock, &write_set);
int s = select(0, &read_set, &write_set, NULL, &timeout);
if(!mgr->isRunning)
break;
if (s > 0) {
// 检查正在连接的Socket,有没有连接成功的
for(itt = mgr->m_connecting.begin(); itt != mgr->m_connecting.end(); ++itt) {
if((FD_ISSET(itt->sock, &write_set))) {
mgr->AddP2PClient(*itt);
mgr->m_connecting.erase(itt);
// 回到开始,再找一遍,直到都是尚未连接成功的
itt = mgr->m_connecting.begin();
continue;
}
}
// 检查所有连接有没有收到数据
for(it = mgr->m_clients.begin(); it != mgr->m_clients.end(); ++it) {
if(!it->IsValid())
continue;
if((FD_ISSET(it->GetSocket(), &read_set))) {
if(!it->BaseRecv()) {
// 关闭该连接,并清空该连接要发送的数据包
it->SetInvalid();
}
break;
}
}
// 检查有没有新的连入连接
if(FD_ISSET(mgr->m_Socket, &read_set)) {
SOCKET sock = accept(
mgr->m_Socket, (struct sockaddr *)&tmpAddr, &addrlen);
if(sock == INVALID_SOCKET) {
mgr->comm->logFile.StatusErr("accepting socket", WSAGetLastError());
mgr->SafeCloseSocket(sock); // no force disconnect
}
else {
// Set this socket as a Non-blocking socket.
ULONG flag = 1;
if (ioctlsocket(sock, FIONBIO, &flag) == -1) {
mgr->comm->logFile.StatusErr("Setting accepted socket as non-blocking", WSAGetLastError());
}
else {
incomingPeer.sock = sock;
incomingPeer.isForFree = false;
incomingPeer.isCachePeer = false;
incomingPeer.isIncoming = true;
incomingPeer.isPassive = false;
incomingPeer.outerIP.sin_addr.s_addr = tmpAddr.sin_addr.s_addr;
incomingPeer.isSameLan = false;
mgr->AddP2PClient(incomingPeer);
}
}
}
// 检查CSClient有没有收到UDP包
if(FD_ISSET(mgr->comm->csClient.GetSocket(), &read_set)) {
if(!mgr->comm->csClient.ParseMsg(tmpAddr, addrlen)) {
;
}
}
// 检查LocalPeerFinder有没有收到广播
if (mgr->comm->localPeerFinder.IsRunning() &&
FD_ISSET(mgr->comm->localPeerFinder.GetSocket(), &read_set)) {
mgr->comm->localPeerFinder.ReadBroadcast();
}
}
else if(s == SOCKET_ERROR) {
mgr->comm->logFile.StatusErr("selecting", WSAGetLastError());
Sleep(1); // prevent dead loop
}
// 从所有连接的发送队列中提取数据发送
mgr->SendPackets();
// 获取当前时间
currTime = timeGetTime();
// 如果出现多个超时的socket,也只删除第一个,因为这里的循环很快,不用担心删不掉
for(itt = mgr->m_connecting.begin(); itt != mgr->m_connecting.end(); ++itt) {
if(currTime - itt->connectTime >= P2P_CONNECT) {
mgr->m_connectFailCount++;
mgr->comm->logFile.StatusOut("Connecting socket %s is Timeout!", mgr->FormatIPAddress(*itt));
// 超时!关闭Socket, 从连接列表中删除,并且添加到badAddr列表
mgr->SafeCloseSocket(itt->sock); // no force disconnect
mgr->AddBadAddr(*itt);
mgr->m_connecting.erase(itt);
break;
}
}
// 每隔一段时间尝试连接其他Peer
if(currTime-lastTryConnectTime >= TRY_CONNECT) {
lastTryConnectTime = currTime;
mgr->comm->tryClient.Try();
}
if(currTime-lastManageTime >= P2P_MANAGE) {
lastManageTime = currTime;
// 删除超时的连接
//mgr->comm->logFile.StatusOut("processing kill idle connections....");
for(it = mgr->m_clients.begin(); it != mgr->m_clients.end(); ++it) {
if(it->IsValid() && it->IsIdleTooLong()) {
mgr->AddBadAddr(it->GetAddress());
it->SetInvalid();
}
}
// 生成传输信息并打印
mgr->GenerateTransferInfo(TRUE);
TransferInfo tiTemp;
mgr->GetTransferInfo(tiTemp);
mgr->comm->logFile.StatusOut("Cur: (%.2f/%.2f)KB/s. Avg: (%.2f/%.2f)KB/s. Total: %.2f/%.2fMB. Layer: %d.\n\tConnect: low version %d, Fail %d, In %d, Out %d. AvgTime: In %d, Out %d.",
tiTemp.currDownSpeed/1024, tiTemp.currUpSpeed/1024,
tiTemp.avgDownSpeed/1024, tiTemp.avgUpSpeed/1024,
tiTemp.totalDownBytes/1024.f/1024.f, tiTemp.totalUpBytes/1024.f/1024.f,
mgr->GetSelfLayer(), mgr->GetLowVersionCount(), mgr->GetConnectFailCount(),
mgr->GetTotalIncomingCount(), mgr->GetTotalOutgoingCount(),
mgr->GetAvgIncomingTime(), mgr->GetAvgOutgoingTime());
if(mgr->comm->currRes) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -