⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 p2pmgr.cpp

📁 mysee网络直播源代码Mysee Lite是Mysee独立研发的网络视频流媒体播放系统。在应有了P2P技术和一系列先进流媒体技术之后
💻 CPP
📖 第 1 页 / 共 3 页
字号:
/*
 *  Openmysee
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 */
#include "StdAfx.h"
#include "P2PMgr.h"
#include "Communicator.h"

namespace NPLayer1 {

P2PMgr::P2PMgr(Communicator* c) : comm(c), m_freeList(8, 1) {
	m_Socket = INVALID_SOCKET;
	isRunning = FALSE;
	mgrThread = NULL;
	m_totalBlockDataUp = m_totalBlockDataDown = 0;
	m_totalBytesFromCP = 0;
	m_connectFailCount = m_lowversionCount = 0;
	m_incomingCount = m_outgoingCount = 0;
	m_incomingElapsedTime = m_outgoingElapsedTime = 0;
	m_bBlockIntervalHasChanged = true;
}

P2PMgr::~P2PMgr() {
	Uninit();

	comm->logFile.StatusOut("P2PMgr: exited...");
}

P2P_RETURN_TYPE P2PMgr::Init() {
	if(!isRunning) {
		isRunning = TRUE;

		// 初始化足够的P2PClient
		m_clients.resize(comm->cfgData.GetSum());

		P2P_RETURN_TYPE ret = Binding();
		if(ret < PRT_OK)
			return ret;

		comm->logFile.StatusOut("P2PMgr Thread %d Begin, this=%p, comm=%p, comm->p2pMgr=%p", 
			::GetCurrentThreadId(), this, comm, &comm->p2pMgr);
		DWORD threadID;
		mgrThread = CreateThread(
			NULL, 0, (LPTHREAD_START_ROUTINE)(P2PMgr::RunManager), this, 0, &threadID);
		if(mgrThread == NULL)
			return PRT_SYS;
		// 设置较低的优先级
		SetThreadPriority(mgrThread, THREAD_PRIORITY_BELOW_NORMAL);
	}
	return PRT_OK;
}

void P2PMgr::Uninit() {
	if(isRunning) {
		isRunning = FALSE;
		SafeCloseSocket(m_Socket);

		if(mgrThread) {
			// 如果一段时间内不能正常停止,就强行终止线程
			comm->logFile.StatusOut("P2PMgr: Wait for Thread exit...");
			DWORD ret = WaitForSingleObject(mgrThread, 5000);
			if(ret == WAIT_TIMEOUT) {
				comm->logFile.StatusOut("P2PMgr: Wait for Thread exit...Failed! Terminate it!");
				TerminateThread(mgrThread, 0);
			}
			CloseHandle(mgrThread);
			mgrThread = NULL;
		}

		comm->logFile.StatusOut("P2PMgr: Clear P2PClient list...");
		for(ClientIt it = m_clients.begin(); it != m_clients.end(); ++it) {
			(*it).SetInvalid();
		}
		m_clients.clear();

		comm->logFile.StatusOut("P2PMgr: Clear other lists...");
		// 清空正在连接的列表
		m_connecting.clear();
		// 清空已知Peer的列表
		m_peers.clear();
		// 清空无法连接的地址列表
		m_badAddr.clear();
		// 清空需要连接的列表
		m_connectto.clear();
	}
}

P2P_RETURN_TYPE P2PMgr::Binding() {
	// Microsoft Knowledge Base: WSA_FLAG_OVERLAPPED Is Needed for Non-Blocking Sockets
	// http://support.microsoft.com/default.aspx?scid=kb;EN-US;179942
	m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
	if(m_Socket == INVALID_SOCKET) {
		comm->logFile.StatusErr("Creating listening socket", WSAGetLastError());
		return PRT_NET;
	}

	// SO_EXCLUSIVEADDRUSE is only supported in 2000/XP/2003 and higher OS.
	if(comm->osvi.dwMajorVersion > 5) {
		//独占端口
		BOOL bExAddrUse = TRUE;
		if(setsockopt(m_Socket, SOL_SOCKET, ((int)(~SO_REUSEADDR)), (const char*)&bExAddrUse, sizeof(BOOL)) == SOCKET_ERROR) {
			comm->logFile.StatusErr("Setting TCP socket as SO_EXCLUSIVEADDRUSE", WSAGetLastError());
			return PRT_NET;
		}
	}

	// 不使用Nagle算法
	BOOL bNoDelay = TRUE;
	if(setsockopt(m_Socket, SOL_SOCKET, TCP_NODELAY, (const char*)&bNoDelay, sizeof(bNoDelay)) == SOCKET_ERROR) {
		comm->logFile.StatusErr("Setting TCP socket as TCP_NODELAY", WSAGetLastError());
		return PRT_NET;
	}

	// Set this socket as a Non-blocking socket.
	ULONG flag = 1;
	if (ioctlsocket(m_Socket, FIONBIO, &flag) == -1) {
		comm->logFile.StatusErr("Setting listening socket as non-blocking", WSAGetLastError());
		return PRT_NET;
	}

	// Associate the local address with m_Socket.
	sockaddr_in addr;
	memset(&addr, 0, sizeof(sockaddr_in));
	addr.sin_family = AF_INET;
	addr.sin_addr.s_addr = htonl(INADDR_ANY);

	USHORT port = 0;
	for(int i = 0; i < 1000; ++i) {
		switch(i) {
		case 0:		port = 80;	break;
		case 1:		port = 21;	break;
		case 2:		port = 22;	break;
		case 3:		port = 23;	break;
		case 4:		port = 20;	break;
		default:	port = static_cast<USHORT>(rand(&comm->ctx))%10000 + 45000;	break;
		}
		addr.sin_port = htons(port);
		if(bind(m_Socket, (sockaddr*)&addr, sizeof(addr)) != SOCKET_ERROR)
			break;
		comm->logFile.StatusOut("Bind TCP socket at port %d. Failed", port);
	}
	if(i == 1000)
		return PRT_INIT_BIND;

	// Establish a socket to listen for incoming connections.
	if(listen(m_Socket, 3) == SOCKET_ERROR) {
		comm->logFile.StatusErr("Listening socket", WSAGetLastError());
		return PRT_NET;
	}

	comm->localAddress.subnetIP.sin_port = addr.sin_port;
	comm->logFile.StatusOut("Binding TCP socket at port %d.", port);

	return PRT_OK;
}

void __stdcall P2PMgr::RunManager(P2PMgr* mgr) {
	timeval timeout; 
	timeout.tv_sec = 0;
	timeout.tv_usec = 50000;

	fd_set read_set, write_set;

	sockaddr_in tmpAddr;
	memset(&tmpAddr, 0, sizeof(tmpAddr));
	int addrlen = sizeof(tmpAddr);

	bool nullLastTime = true;
	bool bFirstTime = true;

	ConnectingPeer incomingPeer;

	DWORD lastManageTime=0, lastCSReportTime=0, 
		lastP2PReportTime=0, lastP2PNearPeerTime=0, 
		lastTryConnectTime=0, lastClearBadAddress=0,
		lastCheckPushListTime=0, lastCSReport2Time=0,
		lastBroadcastInfoTime=0, currTime=0;

	// 每隔一段比较长的时间,比如20次report之后,就发送一次更新全部区间列表的消息
	UINT csReportCount=0, p2pReportCount=0;

	// 设置这个时间意味着,第一次While循环时,csClient.SendLogin不会被执行
	// 为什么要这样做,因为SendLogin已经在Communicator初始化之际被执行了;
	// 不必在这么短的时间内连续发送两个Login,徒然增加TS负担而已
	lastCSReportTime = timeGetTime();

	mgr->comm->logFile.StatusOut("P2PMgr Thread %d Started, mgr=%p, mgr->comm=%p, mgr->comm->p2pMgr=%p", 
		::GetCurrentThreadId(), mgr, mgr->comm, &mgr->comm->p2pMgr);

	while(mgr->isRunning) {
		CriticalSection::Owner lock(mgr->comm->mainThreadLock);

		FD_ZERO(&read_set);
		FD_SET(mgr->m_Socket, &read_set);
		FD_SET(mgr->comm->csClient.GetSocket(), &read_set);
		if (mgr->comm->localPeerFinder.IsRunning()) {
			FD_SET(mgr->comm->localPeerFinder.GetSocket(), &read_set);
		}
		for(ClientIt it = mgr->m_clients.begin(); it != mgr->m_clients.end(); ++it) {
			if(!it->IsValid())
				continue;
			FD_SET(it->GetSocket(), &read_set);
		}

		FD_ZERO(&write_set);
		for(CPeerIt itt = mgr->m_connecting.begin(); itt != mgr->m_connecting.end(); ++itt)
			FD_SET(itt->sock, &write_set);

		int s = select(0, &read_set, &write_set, NULL, &timeout);
		if(!mgr->isRunning)
			break;
		if (s > 0) {
			// 检查正在连接的Socket,有没有连接成功的
			for(itt = mgr->m_connecting.begin(); itt != mgr->m_connecting.end(); ++itt) {
				if((FD_ISSET(itt->sock, &write_set))) {
					mgr->AddP2PClient(*itt);
					mgr->m_connecting.erase(itt);

					// 回到开始,再找一遍,直到都是尚未连接成功的
					itt = mgr->m_connecting.begin();
					continue;
				}
			}
			// 检查所有连接有没有收到数据
			for(it = mgr->m_clients.begin(); it != mgr->m_clients.end(); ++it) {
				if(!it->IsValid())
					continue;
				if((FD_ISSET(it->GetSocket(), &read_set))) {
					if(!it->BaseRecv()) {
						// 关闭该连接,并清空该连接要发送的数据包
						it->SetInvalid();
					}
					break;
				}
			}
			// 检查有没有新的连入连接
			if(FD_ISSET(mgr->m_Socket, &read_set)) {
				SOCKET sock = accept(
					mgr->m_Socket, (struct sockaddr *)&tmpAddr, &addrlen);

				if(sock == INVALID_SOCKET) {
					mgr->comm->logFile.StatusErr("accepting socket", WSAGetLastError());
					mgr->SafeCloseSocket(sock); // no force disconnect
				}
				else {
					// Set this socket as a Non-blocking socket.
					ULONG flag = 1;
					if (ioctlsocket(sock, FIONBIO, &flag) == -1) {
						mgr->comm->logFile.StatusErr("Setting accepted socket as non-blocking", WSAGetLastError());
					}
					else {
						incomingPeer.sock = sock;
						incomingPeer.isForFree = false;
						incomingPeer.isCachePeer = false;
						incomingPeer.isIncoming = true;
						incomingPeer.isPassive = false;
						incomingPeer.outerIP.sin_addr.s_addr = tmpAddr.sin_addr.s_addr;
						incomingPeer.isSameLan = false;
						mgr->AddP2PClient(incomingPeer);
					}
				}
			}
			// 检查CSClient有没有收到UDP包
			if(FD_ISSET(mgr->comm->csClient.GetSocket(), &read_set)) {
				if(!mgr->comm->csClient.ParseMsg(tmpAddr, addrlen)) {
					;
				}
			}

			// 检查LocalPeerFinder有没有收到广播
			if (mgr->comm->localPeerFinder.IsRunning() && 
				FD_ISSET(mgr->comm->localPeerFinder.GetSocket(), &read_set)) {
				mgr->comm->localPeerFinder.ReadBroadcast();
			}
		}
		else if(s == SOCKET_ERROR) {
			mgr->comm->logFile.StatusErr("selecting", WSAGetLastError());
			Sleep(1); // prevent dead loop
		}

		// 从所有连接的发送队列中提取数据发送
		mgr->SendPackets();

		// 获取当前时间
		currTime = timeGetTime();

		// 如果出现多个超时的socket,也只删除第一个,因为这里的循环很快,不用担心删不掉
		for(itt = mgr->m_connecting.begin(); itt != mgr->m_connecting.end(); ++itt) {
			if(currTime - itt->connectTime >= P2P_CONNECT) {
				mgr->m_connectFailCount++;
				mgr->comm->logFile.StatusOut("Connecting socket %s is Timeout!", mgr->FormatIPAddress(*itt));
				// 超时!关闭Socket, 从连接列表中删除,并且添加到badAddr列表
				mgr->SafeCloseSocket(itt->sock); // no force disconnect
				mgr->AddBadAddr(*itt);

				mgr->m_connecting.erase(itt);
				break;
			}
		}

		// 每隔一段时间尝试连接其他Peer
		if(currTime-lastTryConnectTime >= TRY_CONNECT) {
			lastTryConnectTime = currTime;
			mgr->comm->tryClient.Try();
		}

		if(currTime-lastManageTime >= P2P_MANAGE) {
			lastManageTime = currTime;
			// 删除超时的连接
			//mgr->comm->logFile.StatusOut("processing kill idle connections....");
			for(it = mgr->m_clients.begin(); it != mgr->m_clients.end(); ++it) {
				if(it->IsValid() && it->IsIdleTooLong()) {
					mgr->AddBadAddr(it->GetAddress());
					it->SetInvalid();
				}
			}

			// 生成传输信息并打印
			mgr->GenerateTransferInfo(TRUE);
			TransferInfo tiTemp;
			mgr->GetTransferInfo(tiTemp);
			mgr->comm->logFile.StatusOut("Cur: (%.2f/%.2f)KB/s. Avg: (%.2f/%.2f)KB/s. Total: %.2f/%.2fMB. Layer: %d.\n\tConnect: low version %d, Fail %d, In %d, Out %d. AvgTime: In %d, Out %d.", 
				tiTemp.currDownSpeed/1024, tiTemp.currUpSpeed/1024, 
				tiTemp.avgDownSpeed/1024, tiTemp.avgUpSpeed/1024, 
				tiTemp.totalDownBytes/1024.f/1024.f, tiTemp.totalUpBytes/1024.f/1024.f, 
				mgr->GetSelfLayer(), mgr->GetLowVersionCount(), mgr->GetConnectFailCount(), 
				mgr->GetTotalIncomingCount(), mgr->GetTotalOutgoingCount(), 
				mgr->GetAvgIncomingTime(), mgr->GetAvgOutgoingTime());
			if(mgr->comm->currRes) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -