⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 servent.cpp

📁 P2P应用 : Peercast的源代码
💻 CPP
📖 第 1 页 / 共 4 页
字号:
					int ip,port;					port = pong.readShort();					ip = pong.readLong();					ip = SWAP4(ip);					Host h(ip,port);
					if ((ip) && (port) && (h.globalIP()))					{

						LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);						servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime());					}					//return;				} else if (pack.func == GNU_FUNC_HIT)
				{
					MemoryStream data(pack.data,pack.len);
					ChanHit hit;
					gnuStream.readHit(data,hit,pack.hops,pack.id);
				}				//if (gnuStream.packetsIn > 5)	// die if we get too many packets				//	return;			}

			if((sys->getTime()-lastConnect > 60))
				break;		}	}catch(StreamException &e)	{		LOG_ERROR("Relay: %s",e.msg);	}	}	// -----------------------------------int Servent::givProc(ThreadInfo *thread){//	thread->lock();	Servent *sv = (Servent*)thread->data;	try 	{
		sv->handshakeGiv(sv->givID);
		sv->handshakeIncoming();
	}catch(StreamException &e)	{		LOG_ERROR("GIV: %s",e.msg);	}	sv->kill();	sys->endThread(thread);
	return 0;}

// -----------------------------------
void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
{

	bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
	bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);

	bool sendBCID = isTrusted && chanMgr->isBroadcasting();

	atom.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
		atom.writeString(PCP_HELO_AGENT,PCX_AGENT);
		atom.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
		atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
		if (nonFW)
			atom.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
		if (testFW)
			atom.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
		if (sendBCID)
			atom.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);


	LOG_DEBUG("PCP outgoing waiting for OLEH..");



	int numc,numd;
	ID4 id = atom.read(numc,numd);
	if (id != PCP_OLEH)
	{
		LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
		atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
		throw StreamException("Got unexpected PCP response");
	}



	char arg[64];

	GnuID clientID;
	clientID.clear();
	rid.clear();
	int version=0;
	int disable=0;

	Host thisHost;

	// read OLEH response
	for(int i=0; i<numc; i++)
	{
		int c,dlen;
		ID4 id = atom.read(c,dlen);

		if (id == PCP_HELO_AGENT)
		{
			atom.readString(arg,sizeof(arg),dlen);
			agent.set(arg);

		}else if (id == PCP_HELO_REMOTEIP)
		{
			thisHost.ip = atom.readInt();

		}else if (id == PCP_HELO_PORT)
		{
			thisHost.port = atom.readShort();

		}else if (id == PCP_HELO_VERSION)
		{
			version = atom.readInt();

		}else if (id == PCP_HELO_DISABLE)
		{
			disable = atom.readInt();

		}else if (id == PCP_HELO_SESSIONID)
		{
			atom.readBytes(rid.id,16);
			if (rid.isSame(servMgr->sessionID))
				throw StreamException("Servent loopback");

		}else
		{
			LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
			atom.skip(c,dlen);
		}

    }


	// update server ip/firewall status
	if (isTrusted)
	{
		if (thisHost.isValid())
		{
			if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
			{
				char ipstr[64];
				thisHost.toStr(ipstr);
				LOG_DEBUG("Got new ip: %s",ipstr);
				servMgr->serverHost.ip = thisHost.ip;
			}

			if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
			{
				if (thisHost.port && thisHost.globalIP())
					servMgr->setFirewall(ServMgr::FW_OFF);
				else
					servMgr->setFirewall(ServMgr::FW_ON);
			}
		}

		if (disable == 1)
		{
			LOG_ERROR("client disabled: %d",disable);
			servMgr->isDisabled = true;		
		}else
		{
			servMgr->isDisabled = false;		
		}
	}



	if (!rid.isSet())
	{
		atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
		throw StreamException("Remote host not identified");
	}

	LOG_DEBUG("PCP Outgoing handshake complete.");

}
// -----------------------------------
void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
{
	int numc,numd;
	ID4 id = atom.read(numc,numd);


	if (id != PCP_HELO)
	{
		LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
		atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
		throw StreamException("Got unexpected PCP response");
	}

	char arg[64];

	ID4 osType;

	int version=0;

	int pingPort=0;

	GnuID bcID;
	GnuID clientID;

	bcID.clear();
	clientID.clear();

	rhost.port = 0;

	for(int i=0; i<numc; i++)
	{

		int c,dlen;
		ID4 id = atom.read(c,dlen);

		if (id == PCP_HELO_AGENT)
		{
			atom.readString(arg,sizeof(arg),dlen);
			agent.set(arg);

		}else if (id == PCP_HELO_VERSION)
		{
			version = atom.readInt();

		}else if (id == PCP_HELO_SESSIONID)
		{
			atom.readBytes(rid.id,16);
			if (rid.isSame(servMgr->sessionID))
				throw StreamException("Servent loopback");

		}else if (id == PCP_HELO_BCID)
		{
			atom.readBytes(bcID.id,16);

		}else if (id == PCP_HELO_OSTYPE)
		{
			osType = atom.readInt();
		}else if (id == PCP_HELO_PORT)
		{
			rhost.port = atom.readShort();
		}else if (id == PCP_HELO_PING)
		{
			pingPort = atom.readShort();
		}else
		{
			LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
			atom.skip(c,dlen);
		}

    }

	if (version)
		LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);


	if (!rhost.globalIP() && servMgr->serverHost.globalIP())
		rhost.ip = servMgr->serverHost.ip;

	if (pingPort)
	{
		char ripStr[64];
		rhost.toStr(ripStr);
		LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
		rhost.port = pingPort;
		if (!rhost.globalIP() || !pingHost(rhost,rid))
			rhost.port = 0;
	}

	if (servMgr->isRoot)
	{
		if (bcID.isSet())
		{
			if (bcID.getFlags() & 1)	// private
			{
				BCID *bcid = servMgr->findValidBCID(bcID);
				if (!bcid || (bcid && !bcid->valid))
				{
					atom.writeParent(PCP_OLEH,1);
					atom.writeInt(PCP_HELO_DISABLE,1);
					throw StreamException("Client is banned");
				}
			}
		}
	}


	atom.writeParent(PCP_OLEH,5);
		atom.writeString(PCP_HELO_AGENT,PCX_AGENT);
		atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
		atom.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
		atom.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
		atom.writeShort(PCP_HELO_PORT,rhost.port);

	if (version)
	{
		if (version < PCP_CLIENT_MINVERSION)
		{
			atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
			throw StreamException("Agent is not valid");
		}
	}

	if (!rid.isSet())
	{
		atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
		throw StreamException("Remote host not identified");
	}



	if (servMgr->isRoot)
	{
		servMgr->writeRootAtoms(atom,false);
	}

	LOG_DEBUG("PCP Incoming handshake complete.");

}

// -----------------------------------
void Servent::processIncomingPCP(bool suggestOthers)
{
	PCPStream::readVersion(*sock);


	AtomStream atom(*sock);
	Host rhost = sock->host;

	handshakeIncomingPCP(atom,rhost,remoteID,agent);


	bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
							|| (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
	bool unavailable = servMgr->controlInFull();
	bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();

	char rstr[64];
	rhost.toStr(rstr);

	if (unavailable || alreadyConnected || offair)
	{
		int error;

		if (alreadyConnected)
			error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
		else if (unavailable)
			error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
		else if (offair)
			error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
		else 
			error = PCP_ERROR_QUIT;


		if (suggestOthers)
		{

			ChanHit best;
			ChanHitSearch chs;

			int cnt=0;
			for(int i=0; i<8; i++)
			{
				best.init();

				// find best hit on this network			
				if (!rhost.globalIP())
				{
					chs.init();
					chs.matchHost = servMgr->serverHost;
					chs.waitDelay = 2;
					chs.excludeID = remoteID;
					chs.trackersOnly = true;
					chs.useBusyControls = false;
					if (chanMgr->pickHits(chs))
						best = chs.best[0];
				}

				// find best hit on same network			
				if (!best.host.ip)
				{
					chs.init();
					chs.matchHost = rhost;
					chs.waitDelay = 2;
					chs.excludeID = remoteID;
					chs.trackersOnly = true;
					chs.useBusyControls = false;
					if (chanMgr->pickHits(chs))
						best = chs.best[0];
				}

				// else find best hit on other networks
				if (!best.host.ip)
				{
					chs.init();
					chs.waitDelay = 2;
					chs.excludeID = remoteID;
					chs.trackersOnly = true;
					chs.useBusyControls = false;
					if (chanMgr->pickHits(chs))
						best = chs.best[0];
				}

				if (!best.host.ip)
					break;
				
				GnuID noID;
				noID.clear();
				best.writeAtoms(atom,noID);
				cnt++;
			}
			if (cnt)
			{
				LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
			}
			else if (rhost.port)
			{
				// send push request to best firewalled tracker on other network
				chs.init();
				chs.waitDelay = 30;
				chs.excludeID = remoteID;
				chs.trackersOnly = true;
				chs.useFirewalled = true;
				chs.useBusyControls = false;
				if (chanMgr->pickHits(chs))
				{
					best = chs.best[0];
					GnuID noID;
					noID.clear();
					int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
					LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
				}
			}else
			{
				LOG_DEBUG("No available trackers");
			}
		}


		LOG_ERROR("Sending QUIT to incoming: %d",error);

		atom.writeInt(PCP_QUIT,error);
		return;		
	}
	

	type = T_CIN;
	setStatus(S_CONNECTED);

	atom.writeInt(PCP_OK,0);

	// ask for update
	atom.writeParent(PCP_ROOT,1);
		atom.writeParent(PCP_ROOT_UPDATE,0);

	pcpStream = new PCPStream(remoteID);

	int error = 0;
	BroadcastState bcs;
	while (!error && thread.active && !sock->eof())
	{
		error = pcpStream->readPacket(*sock,bcs);
		sys->sleepIdle();

		if (!servMgr->isRoot && !chanMgr->isBroadcasting())
			error = PCP_ERROR_OFFAIR;
		if (peercastInst->isQuitting)
			error = PCP_ERROR_SHUTDOWN;
	}

	pcpStream->flush(*sock);

	error += PCP_ERROR_QUIT;
	atom.writeInt(PCP_QUIT,error);

	LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);

}

// -----------------------------------int Servent::outgoingProc(ThreadInfo *thread){//	thread->lock();	LOG_DEBUG("COUT started");
	Servent *sv = (Servent*)thread->data;		
	GnuID noID;
	noID.clear();
	sv->pcpStream = new PCPStream(noID);

	while (sv->thread.active)
	{
		sv->setStatus(S_WAIT);

		if (chanMgr->isBroadcasting() && servMgr->autoServe)
		{
			ChanHit bestHit;
			ChanHitSearch chs;
			char ipStr[64];

			do
			{
				bestHit.init();

				if (servMgr->rootHost.isEmpty())
					break;

				if (sv->pushSock)
				{
					sv->sock = sv->pushSock;
					sv->pushSock = NULL;
					bestHit.host = sv->sock->host;
					break;
				}

				GnuID noID;
				noID.clear();
				ChanHitList *chl = chanMgr->findHitListByID(noID);
				if (chl)
				{
					// find local tracker
					chs.init();
					chs.matchHost = servMgr->serverHost;
					chs.waitDelay = MIN_TRACKER_RETRY;
					chs.excludeID = servMgr->sessionID;
					chs.trackersOnly = true;
					if (!chl->pickHits(chs))
					{
						// else find global tracker
						chs.init();
						chs.waitDelay = MIN_TRACKER_RETRY;
						chs.excludeID = servMgr->sessionID;
						chs.trackersOnly = true;
						chl->pickHits(chs);
					}

					if (chs.numResults)
					{
						bestHit = chs.best[0];
					}
				}


				unsigned int ctime = sys->getTime();

				if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
				{
					bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
					bestHit.yp = true;
					chanMgr->lastYPConnect = ctime;
				}
				sys->sleepIdle();

			}while (!bestHit.host.ip && (sv->thread.active));


			if (!bestHit.host.ip)		// give up
			{
				LOG_ERROR("COUT giving up");
				break;
			}


			bestHit.host.toStr(ipStr);

			int error=0;
			try 			{

				LOG_DEBUG("COUT to %s: Connecting..",ipStr);

				if (!sv->sock)
				{
					sv->setStatus(S_CONNECTING);
					sv->sock = sys->createSocket();
					if (!sv->sock)
						throw StreamException("Unable to create socket");
					sv->sock->open(bestHit.host);
					sv->sock->connect();

				}

				sv->sock->setReadTimeout(30000);
				AtomStream atom(*sv->sock);

				sv->setStatus(S_HANDSHAKE);
				Host rhost = sv->sock->host;
				atom.writeInt(PCP_CONNECT,1);
				handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);

				sv->setStatus(S_CONNECTED);

				LOG_DEBUG("COUT to %s: OK",ipStr);

				sv->pcpStream->init(sv->remoteID);

				BroadcastState bcs;
				error = 0;
				while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
				{
					error = sv->pcpStream->readPacket(*sv->sock,bcs);

					sys->sleepIdle();

					if (!chanMgr->isBroadcasting())
						error = PCP_ERROR_OFFAIR;
					if (peercastInst->isQuitting)
						error = PCP_ERROR_SHUTDOWN;

					if (sv->pcpStream->nextRootPacket)
						if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
							error = PCP_ERROR_NOROOT;
				}
				sv->setStatus(S_CLOSING);

				sv->pcpStream->flush(*sv->sock);

				error += PCP_ERROR_QUIT;
				atom.writeInt(PCP_QUIT,error);

				LOG_ERROR("COUT to %s closed: %d",ipStr,error);
			}catch(TimeoutException &e)			{				LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg);				sv->setStatus(S_TIMEOUT);
			}catch(StreamException &e)			{				LOG_ERROR("COUT to %s: %s",ipStr,e.msg);				sv->setStatus(S_ERROR);			}

			try
			{
				if (sv->sock)
				{
					sv->sock->close();
					delete sv->sock;
					sv->sock = NULL;
				}

			}catch(StreamException &) {}

			// don`t discard this hit if we caused the disconnect (stopped broadcasting)
			if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
				chanMgr->deadHit(bestHit);

		}

		sys->sleepIdle();
	}	sv->kill();	sys->endThread(thread);
	LOG_DEBUG("COUT ended");
	return 0;}// -----------------------------------int Servent::incomingProc(ThreadInfo *thread){//	thread->lock();	Servent *sv = (Servent*)thread->data;	
	char ipStr[64];
	sv->sock->host.toStr(ipStr);
	try 	{		sv->handshakeIncoming();	}catch(HTTPException &e)	{		try		{			sv->sock->writeLine(e.msg);			if (e.code == 401)				sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");			sv->sock->writeLine("");		}catch(StreamException &){}		LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);	}catch(StreamException &e)	{		LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);	}

	sv->kill();
	sys->endThread(thread);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -