⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 servent.cpp

📁 P2P应用 : Peercast的源代码
💻 CPP
📖 第 1 页 / 共 4 页
字号:
			s->connect();

			AtomStream atom(*s);

			atom.writeInt(PCP_CONNECT,1);
			atom.writeParent(PCP_HELO,1);
				atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);

			GnuID sid;
			sid.clear();

			int numc,numd;
			ID4 id = atom.read(numc,numd);
			if (id == PCP_OLEH)
			{
				for(int i=0; i<numc; i++)
				{
					int c,d;
					ID4 pid = atom.read(c,d);
					if (pid == PCP_SESSIONID)
						atom.readBytes(sid.id,16,d);
					else
						atom.skip(c,d);
				}
			}else
			{
				LOG_DEBUG("Ping response: %s",id.getString().str());
				throw StreamException("Bad ping response");
			}

			if (!sid.isSame(rsid))
				throw StreamException("SIDs don`t match");

			hostOK = true;
			LOG_DEBUG("Ping host %s: OK",ipstr);
			atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);


		}
	}catch(StreamException &e)
	{
		LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
	}
	if (s)
	{
		s->close();
		delete s;
	}

	if (!hostOK)
		rhost.port = 0;

	return true;
}


// -----------------------------------bool Servent::handshakeStream(ChanInfo &chanInfo){

	HTTP http(*sock);

	bool gotPCP=false;
	unsigned int reqPos=0;

	nsSwitchNum=0;

	while (http.nextHeader())	{		char *arg = http.getArgStr();		if (!arg)			continue;		if (http.isHeader(PCX_HS_PCP))
			gotPCP = atoi(arg)!=0;
		else if (http.isHeader(PCX_HS_POS))
			reqPos = atoi(arg);		else if (http.isHeader("icy-metadata"))			addMetadata = atoi(arg) > 0;
		else if (http.isHeader(HTTP_HS_AGENT))
			agent = arg;
		else if (http.isHeader("Pragma"))
		{
			char *ssc = stristr(arg,"stream-switch-count=");
			char *so = stristr(arg,"stream-offset");

			if (ssc || so)
			{
				nsSwitchNum=1;
				//nsSwitchNum = atoi(ssc+20);
			}
		}
		LOG_DEBUG("Stream: %s",http.cmdLine);	}


	if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
		outputProtocol = ChanInfo::SP_PEERCAST;

	if (outputProtocol == ChanInfo::SP_HTTP)
	{
		if  ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
			  || (chanInfo.contentType == ChanInfo::T_WMA)
			  || (chanInfo.contentType == ChanInfo::T_WMV)
			  || (chanInfo.contentType == ChanInfo::T_ASX)
			)
		outputProtocol = ChanInfo::SP_MMS;
	}


	bool chanFound=false;
	bool chanReady=false;

	Channel *ch = chanMgr->findChannelByID(chanInfo.id);
	if (ch)
	{
		sendHeader = true;
		if (reqPos)
		{
			streamPos = ch->rawData.findOldestPos(reqPos);
		}else
		{
			streamPos = ch->rawData.getLatestPos();
		}

		chanReady = canStream(ch);
	}

	ChanHitList *chl = chanMgr->findHitList(chanInfo);
	if (chl)
	{
		chanFound = true;
	}


	bool result = false;
	char idStr[64];
	chanInfo.id.toStr(idStr);

	char sidStr[64];
	servMgr->sessionID.toStr(sidStr);

	Host rhost = sock->host;




	AtomStream atom(*sock);



	if (!chanFound)
	{
		sock->writeLine(HTTP_SC_NOTFOUND);
	    sock->writeLine("");
		LOG_DEBUG("Sending channel not found");
		return false;
	}


	if (!chanReady)
	{
		if (outputProtocol == ChanInfo::SP_PCP)
		{

			sock->writeLine(HTTP_SC_UNAVAILABLE);
			sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
			sock->writeLine("");

			handshakeIncomingPCP(atom,rhost,remoteID,agent);

			char ripStr[64];
			rhost.toStr(ripStr);

			LOG_DEBUG("Sending channel unavailable");

			ChanHitSearch chs;

			int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;

			if (chl)
			{
				ChanHit best;
				
				// search for up to 8 other hits
				int cnt=0;
				for(int i=0; i<8; i++)
				{
					best.init();


					// find best hit this network if local IP
					if (!rhost.globalIP())
					{
						chs.init();
						chs.matchHost = servMgr->serverHost;
						chs.waitDelay = 2;
						chs.excludeID = remoteID;
						if (chl->pickHits(chs))
							best = chs.best[0];
					}

					// find best hit on same network
					if (!best.host.ip)
					{
						chs.init();
						chs.matchHost = rhost;
						chs.waitDelay = 2;
						chs.excludeID = remoteID;
						if (chl->pickHits(chs))
							best = chs.best[0];

					}

					// find best hit on other networks
					if (!best.host.ip)
					{
						chs.init();
						chs.waitDelay = 2;
						chs.excludeID = remoteID;
						if (chl->pickHits(chs))
							best = chs.best[0];

					}
					
					if (!best.host.ip)
						break;

					best.writeAtoms(atom,chanInfo.id);				
					cnt++;
				}

				if (cnt)
				{
					LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);

				}
				else if (rhost.port)
				{
					// find firewalled host
					chs.init();
					chs.waitDelay = 30;
					chs.useFirewalled = true;
					chs.excludeID = remoteID;
					if (chl->pickHits(chs))
					{
						best = chs.best[0];
						int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
						LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
					}
				} 

				// if all else fails, use tracker
				if (!best.host.ip)
				{
					// find best tracker on this network if local IP
					if (!rhost.globalIP())
					{
						chs.init();
						chs.matchHost = servMgr->serverHost;
						chs.trackersOnly = true;
						chs.excludeID = remoteID;
						if (chl->pickHits(chs))
							best = chs.best[0];

					}

					// find local tracker
					if (!best.host.ip)
					{
						chs.init();
						chs.matchHost = rhost;
						chs.trackersOnly = true;
						chs.excludeID = remoteID;
						if (chl->pickHits(chs))
							best = chs.best[0];
					}

					// find global tracker
					if (!best.host.ip)
					{
						chs.init();
						chs.trackersOnly = true;
						chs.excludeID = remoteID;
						if (chl->pickHits(chs))
							best = chs.best[0];
					}

					if (best.host.ip)
					{
						best.writeAtoms(atom,chanInfo.id);				
						LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
					}else if (rhost.port)
					{
						// find firewalled tracker
						chs.init();
						chs.useFirewalled = true;
						chs.trackersOnly = true;
						chs.excludeID = remoteID;
						chs.waitDelay = 30;
						if (chl->pickHits(chs))
						{
							best = chs.best[0];
							int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
							LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
						}
					}

				}


			}
			// return not available yet code
			atom.writeInt(PCP_QUIT,error);
			result = false;

		}else
		{
			LOG_DEBUG("Sending channel unavailable");
			sock->writeLine(HTTP_SC_UNAVAILABLE);
			sock->writeLine("");
			result = false;
		}

	} else {

		if (chanInfo.contentType != ChanInfo::T_MP3)
			addMetadata=false;
		if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP))		// winamp mp3 metadata check		{
			sock->writeLine(ICY_OK);			sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);			sock->writeLineF("icy-name:%s",chanInfo.name.cstr());			sock->writeLineF("icy-br:%d",chanInfo.bitrate);			sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr());			sock->writeLineF("icy-url:%s",chanInfo.url.cstr());			sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval);			sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);			sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);		}else		{			sock->writeLine(HTTP_SC_OK);			if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA))			{				sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);				sock->writeLine("Accept-Ranges: none");				sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr());				sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate);				sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr());				sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr());				sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr());				sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);			}
			if (outputProtocol == ChanInfo::SP_HTTP)			{				switch (chanInfo.contentType)				{					case ChanInfo::T_OGG:						sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XOGG);						break;					case ChanInfo::T_MP3:						sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);						break;					case ChanInfo::T_MOV:						sock->writeLine("Connection: close");						sock->writeLine("Content-Length: 10000000");						sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MOV);						break;					case ChanInfo::T_MPG:						sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MPG);						break;					case ChanInfo::T_NSV:						sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_NSV);						break;					case ChanInfo::T_ASX:						sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_ASX);
						break;
					case ChanInfo::T_WMA:
						sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMA);
						break;
					case ChanInfo::T_WMV:						sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMV);						break;				}			} else if (outputProtocol == ChanInfo::SP_MMS)
			{
				sock->writeLine("Server: Rex/9.0.0.2980");
				sock->writeLine("Cache-Control: no-cache");
				sock->writeLine("Pragma: no-cache");
				sock->writeLine("Pragma: client-id=3587303426");
				sock->writeLine("Pragma: features=\"broadcast,playlist\"");

				if (nsSwitchNum)
				{
					sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
				}else
				{
					sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
					if (ch)
						sock->writeLineF("Content-Length: %d",ch->headPack.len);
					sock->writeLine("Connection: Keep-Alive");
				}
			
			} else if (outputProtocol == ChanInfo::SP_PCP)
			{
				sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
				sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
			}else if (outputProtocol == ChanInfo::SP_PEERCAST)			{				sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);			}		}
		sock->writeLine("");
		result = true;

		if (gotPCP)
		{
			handshakeIncomingPCP(atom,rhost,remoteID,agent);
			atom.writeInt(PCP_OK,0);
		}

	}


	return result;}// -----------------------------------void Servent::handshakeGiv(GnuID &id){	if (id.isSet())
	{		char idstr[64];		id.toStr(idstr);		sock->writeLineF("GIV /%s",idstr);
	}else		sock->writeLine("GIV");

	sock->writeLine("");
}// -----------------------------------void Servent::processGnutella(){
	type = T_PGNU;

	//if (servMgr->isRoot && !servMgr->needConnections())
	if (servMgr->isRoot)
	{
		processRoot();
		return;
	}

	gnuStream.init(sock);	setStatus(S_CONNECTED);	if (!servMgr->isRoot)
	{
		chanMgr->broadcastRelays(this, 1, 1);
		GnuPacket *p;

		if ((p=outPacketsNorm.curr())) 	
			gnuStream.sendPacket(*p);
		return;
	}	gnuStream.ping(2);//	if (type != T_LOOKUP)//		chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2);	lastPacket = lastPing = sys->getTime();	bool doneBigPing=false;	const unsigned int	abortTimeoutSecs = 60;		// abort connection after 60 secs of no activitiy	const unsigned int	packetTimeoutSecs = 30;		// ping connection after 30 secs of no activity	unsigned int currBytes=0;	unsigned int lastWait=0;	unsigned int lastTotalIn=0,lastTotalOut=0;	while (thread.active && sock->active())	{		if (sock->readReady())		{			lastPacket = sys->getTime();			if (gnuStream.readPacket(pack))			{				char ipstr[64];				sock->host.toStr(ipstr);				GnuID routeID;				GnuStream::R_TYPE ret = GnuStream::R_PROCESS;				if (pack.func != GNU_FUNC_PONG)					if (servMgr->seenPacket(pack))						ret = GnuStream::R_DUPLICATE;				seenIDs.add(pack.id);				if (ret == GnuStream::R_PROCESS)				{					GnuID routeID;					ret = gnuStream.processPacket(pack,this,routeID);					if (flowControl && (ret == GnuStream::R_BROADCAST))						ret = GnuStream::R_DROP;				}				switch(ret)				{					case GnuStream::R_BROADCAST:						if (servMgr->broadcast(pack,this))							stats.add(Stats::NUMBROADCASTED);						else							stats.add(Stats::NUMDROPPED);						break;					case GnuStream::R_ROUTE:						if (servMgr->route(pack,routeID,NULL))							stats.add(Stats::NUMROUTED);						else							stats.add(Stats::NUMDROPPED);						break;					case GnuStream::R_ACCEPTED:						stats.add(Stats::NUMACCEPTED);						break;					case GnuStream::R_DUPLICATE:						stats.add(Stats::NUMDUP);						break;					case GnuStream::R_DEAD:						stats.add(Stats::NUMDEAD);						break;					case GnuStream::R_DISCARD:						stats.add(Stats::NUMDISCARDED);						break;					case GnuStream::R_BADVERSION:						stats.add(Stats::NUMOLD);						break;					case GnuStream::R_DROP:						stats.add(Stats::NUMDROPPED);						break;				}				LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr);			}else{				LOG_ERROR("Bad packet");			}		}		GnuPacket *p;		if ((p=outPacketsPri.curr()))				// priority packet		{			gnuStream.sendPacket(*p);			seenIDs.add(p->id);			outPacketsPri.next();		} else if ((p=outPacketsNorm.curr())) 		// or.. normal packet		{			gnuStream.sendPacket(*p);			seenIDs.add(p->id);			outPacketsNorm.next();		}		int lpt =  sys->getTime()-lastPacket;		if (!doneBigPing)		{			if ((sys->getTime()-lastPing) > 15)			{				gnuStream.ping(7);				lastPing = sys->getTime();				doneBigPing = true;			}		}else{			if (lpt > packetTimeoutSecs)			{								if ((sys->getTime()-lastPing) > packetTimeoutSecs)				{					gnuStream.ping(1);					lastPing = sys->getTime();				}			}		}		if (lpt > abortTimeoutSecs)			throw TimeoutException();		unsigned int totIn = sock->totalBytesIn-lastTotalIn;		unsigned int totOut = sock->totalBytesOut-lastTotalOut;		unsigned int bytes = totIn+totOut;		lastTotalIn = sock->totalBytesIn;		lastTotalOut = sock->totalBytesOut;		const int serventBandwidth = 1000;
		int delay = sys->idleSleepTime;		if ((bytes) && (serventBandwidth >= 8))			delay = (bytes*1000)/(serventBandwidth/8);	// set delay relative packetsize		if (delay < (int)sys->idleSleepTime)			delay = sys->idleSleepTime;		//LOG("delay %d, in %d, out %d",delay,totIn,totOut);		sys->sleep(delay);	}}


// -----------------------------------void Servent::processRoot(){	try 	{			gnuStream.init(sock);		setStatus(S_CONNECTED);		gnuStream.ping(2);

		unsigned int lastConnect = sys->getTime();
		while (thread.active && sock->active())		{			if (gnuStream.readPacket(pack))			{				char ipstr[64];				sock->host.toStr(ipstr);								LOG_NETWORK("packet in: %d from %s",pack.func,ipstr);				if (pack.func == GNU_FUNC_PING)		// if ping then pong back some hosts and close				{										Host hl[32];					int cnt = servMgr->getNewestServents(hl,32,sock->host);						if (cnt)					{						int start = sys->rnd() % cnt;
						int max = cnt>8?8:cnt;
						for(int i=0; i<max; i++)						{							GnuPacket pong;							pack.hops = 1;							pong.initPong(hl[start],false,pack);							gnuStream.sendPacket(pong);							char ipstr[64];							hl[start].toStr(ipstr);							//LOG_NETWORK("Pong %d: %s",start+1,ipstr);							start = (start+1) % cnt;						}						char str[64];						sock->host.toStr(str);						LOG_NETWORK("Sent %d pong(s) to %s",max,str);					}else					{						LOG_NETWORK("No Pongs to send");						//return;					}				}else if (pack.func == GNU_FUNC_PONG)		// pong?				{					MemoryStream pong(pack.data,pack.len);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -