⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 servent.cpp.svn-base

📁 这是和p2p相关的一份源码
💻 SVN-BASE
📖 第 1 页 / 共 4 页
字号:
	int osType=0;	HTTP http(*sock);	bool versionValid = false;	bool diffRootVer = false;
	GnuID clientID;	clientID.clear();    while (http.nextHeader())    {		LOG_DEBUG(http.cmdLine);		char *arg = http.getArgStr();		if (!arg)			continue;		if (http.isHeader(HTTP_HS_AGENT))		{			agent.set(arg);			if (strnicmp(arg,"PeerCast/",9)==0)			{				versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);				diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;			}		}else if (http.isHeader(PCX_HS_NETWORKID))		{			clientID.fromStr(arg);		}else if (http.isHeader(PCX_HS_PRIORITY))		{			priorityConnect = atoi(arg)!=0;
		}else if (http.isHeader(PCX_HS_ID))		{			GnuID id;			id.fromStr(arg);			if (id.isSame(servMgr->sessionID))				throw StreamException("Servent loopback");		}else if (http.isHeader(PCX_HS_OS))		{			if (stricmp(arg,PCX_OS_LINUX)==0)				osType = 1;			else if (stricmp(arg,PCX_OS_WIN32)==0)				osType = 2;			else if (stricmp(arg,PCX_OS_MACOSX)==0)				osType = 3;			else if (stricmp(arg,PCX_OS_WINAMP2)==0)				osType = 4;		}    }

	if (!clientID.isSame(networkID))		throw HTTPException(HTTP_SC_UNAVAILABLE,503);	// if this is a priority connection and all incoming connections 	// are full then kill an old connection to make room. Otherwise reject connection.	if (!priorityConnect)	{		if (!isPrivate())			if (servMgr->pubInFull())				throw HTTPException(HTTP_SC_UNAVAILABLE,503);	}	if (!versionValid)		throw HTTPException(HTTP_SC_FORBIDDEN,403);    sock->writeLine(GNU_OK);    sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT);	if (networkID.isSet())	{		char idStr[64];		networkID.toStr(idStr);		sock->writeLine("%s %s",PCX_HS_NETWORKID,idStr);	}	if (servMgr->isRoot)	{		sock->writeLine("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0);		sock->writeLine("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL);		sock->writeLine("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL);		sock->writeLine("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast);		//sock->writeLine("%s %d",PCX_HS_FULLHIT,2);		if (diffRootVer)		{			sock->writeString(PCX_HS_DL);			switch(osType)			{				case 1:					sock->writeLine(PCX_DL_LINUX);					break;				case 2:					sock->writeLine(PCX_DL_WIN32);					break;				case 3:					sock->writeLine(PCX_DL_MACOSX);					break;				case 4:					sock->writeLine(PCX_DL_WINAMP2);					break;				default:					sock->writeLine(PCX_DL_URL);					break;			}		}		sock->writeLine("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());	}	char hostIP[64];	Host h = sock->host;	h.IPtoStr(hostIP);    sock->writeLine("%s %s",PCX_HS_REMOTEIP,hostIP);    sock->writeLine("");	while (http.nextHeader());
}
// -----------------------------------
bool	Servent::pingHost(Host &rhost,GnuID &rsid)
{
	char ipstr[64];
	rhost.toStr(ipstr);
	LOG_DEBUG("Ping host %s: trying..",ipstr);
	ClientSocket *s=NULL;
	try
	{
		s = sys->createSocket();
		if (!s)
			return false;
		else
		{

			s->open(rhost);
			s->connect();

			AtomStream atom(*s);

			atom.writeInt(PCP_CONNECT,1);
			atom.writeParent(PCP_HELO,1);
				atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
			atom.writeInt(PCP_QUIT,0);

			GnuID sid;
			sid.clear();

			int numc,numd;
			ID4 id = atom.read(numc,numd);
			if (id == PCP_OLEH)
			{
				for(int i=0; i<numc; i++)
				{
					int c,d;
					ID4 pid = atom.read(c,d);
					if (pid == PCP_SESSIONID)
						atom.readBytes(sid.id,16,d);
					else
						atom.skip(c,d);
				}
			}else
			{
				LOG_DEBUG("Ping response: %s",id.getString().str());
				throw StreamException("Bad ping response");
			}

			if (!sid.isSame(rsid))
				throw StreamException("SIDs don`t match");

			LOG_DEBUG("Ping host %s: OK",ipstr);

		}
	}catch(StreamException &e)
	{
		rhost.port = 0;
		LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
	}
	if (s)
	{
		s->close();
		delete s;
	}
	return true;
}


// -----------------------------------bool Servent::handshakeStream(ChanInfo &chanInfo){

	HTTP http(*sock);

	bool gotPCP=false;
	unsigned int reqPos=0;

	nsSwitchNum=0;

	while (http.nextHeader())	{		char *arg = http.getArgStr();		if (!arg)			continue;		if (http.isHeader(PCX_HS_PCP))
			gotPCP = atoi(arg)!=0;
		else if (http.isHeader(PCX_HS_POS))
			reqPos = atoi(arg);		else if (http.isHeader("icy-metadata"))			addMetadata = atoi(arg) > 0;
		else if (http.isHeader(HTTP_HS_AGENT))
			agent = arg;
		else if (http.isHeader("Pragma"))
		{
			char *ssc = stristr(arg,"stream-switch-count=");
			char *so = stristr(arg,"stream-offset");

			if (ssc || so)
			{
				nsSwitchNum=1;
				//nsSwitchNum = atoi(ssc+20);
			}
		}
		LOG_DEBUG("Stream: %s",http.cmdLine);	}


	if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
		outputProtocol = ChanInfo::SP_PEERCAST;

	if (outputProtocol == ChanInfo::SP_HTTP)
	{
		if  ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
			  || (chanInfo.contentType == ChanInfo::T_WMA)
			  || (chanInfo.contentType == ChanInfo::T_WMV)
			  || (chanInfo.contentType == ChanInfo::T_ASX)
			)
		outputProtocol = ChanInfo::SP_MMS;
	}


	bool chanFound=false;
	bool chanReady=false;

	Channel *ch = chanMgr->findChannelByID(chanInfo.id);
	if (ch)
	{
		if (reqPos)
			streamPos = ch->rawData.findOldestPos(reqPos);
		else
			streamPos = 0;

		chanReady = canStream(ch);
	}

	ChanHitList *chl = chanMgr->findHitList(chanInfo);
	if (chl)
	{
		chanFound = true;
	}


	bool result = false;
	char idStr[64];
	chanInfo.id.toStr(idStr);

	char sidStr[64];
	servMgr->sessionID.toStr(sidStr);

	Host rhost = sock->host;

	AtomStream atom(*sock);

	if (!chanFound)
	{
		sock->writeLine(HTTP_SC_NOTFOUND);
	    sock->writeLine("");
		LOG_DEBUG("Sending channel not found");
		return false;
	}


	if (!chanReady)
	{
		if (outputProtocol == ChanInfo::SP_PCP)
		{
			LOG_DEBUG("Sending channel unavailable (with hits)");

			sock->writeLine(HTTP_SC_UNAVAILABLE);
			sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
			sock->writeLine("");

			handshakeIncomingPCP(atom,rhost,remoteID,agent);

			if (chl)
			{
				ChanHit best;
				
				if (!rhost.globalIP())
					best = chl->getHit(&servMgr->serverHost,0,false,false);		// find best hit this network

				if (!best.host.ip)
					best = chl->getHit(&rhost,0,false,false);		// find best hit on same network

				if (!best.host.ip)
					best = chl->getHit(NULL,0,false,false);		// else find best hit on other networks

				// force push
				//best.host.ip = 0;
				// 

				if (best.host.ip)
				{
					best.writeAtoms(atom,true,chanInfo.id);				
					LOG_DEBUG("Sent 1 hit");

				}
				else if (rhost.port)
				{
					// send push request to best firewalled host on other network
					best = chl->getHit(NULL,0,true,false);	
					if (best.host.ip)
						servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_STREAM);
				} 

				// if all else fails, contact tracker
				if (!best.host.ip)
				{
					// find best tracker on this network
					if (!rhost.globalIP())
						best = chl->getHit(&servMgr->serverHost,0,false,true,true);		

					// find local tracker
					if (!best.host.ip)
						best = chl->getHit(&rhost,0,false,true,true);	

					// find global tracker
					if (!best.host.ip)
						best = chl->getHit(NULL,0,false,true,true);	

					if (best.host.ip)
					{
						best.writeAtoms(atom,true,chanInfo.id);				
						LOG_DEBUG("Sent 1 tracker hit");
					}else
					{
						// else send push request to tracker
						best = chl->getHit(NULL,0,true,true,true);	
						if (best.host.ip)
							servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
					}

				}


				if (!best.host.ip)
					LOG_DEBUG("Failed to find any hits");


			}
			// return not available yet code
			atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE);
			result = false;

		}else
		{
			LOG_DEBUG("Sending channel unavailable");
			sock->writeLine(HTTP_SC_UNAVAILABLE);
			sock->writeLine("");
			result = false;
		}

	} else {

		if (chanInfo.contentType != ChanInfo::T_MP3)
			addMetadata=false;
		if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP))		// winamp mp3 metadata check		{
			sock->writeLine(ICY_OK);			sock->writeLine("%s %s",HTTP_HS_SERVER,PCX_AGENT);			sock->writeLine("icy-name:%s",chanInfo.name.cstr());			sock->writeLine("icy-br:%d",chanInfo.bitrate);			sock->writeLine("icy-genre:%s",chanInfo.genre.cstr());			sock->writeLine("icy-url:%s",chanInfo.url.cstr());			sock->writeLine("icy-metaint:%d",chanMgr->icyMetaInterval);			sock->writeLine("%s %s",PCX_HS_CHANNELID,idStr);			sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MP3);		}else		{			sock->writeLine(HTTP_SC_OK);			if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA))			{				sock->writeLine("%s %s",HTTP_HS_SERVER,PCX_AGENT);				sock->writeLine("Accept-Ranges: none");				sock->writeLine("x-audiocast-name: %s",chanInfo.name.cstr());				sock->writeLine("x-audiocast-bitrate: %d",chanInfo.bitrate);				sock->writeLine("x-audiocast-genre: %s",chanInfo.genre.cstr());				sock->writeLine("x-audiocast-description: %s",chanInfo.desc.cstr());				sock->writeLine("x-audiocast-url: %s",chanInfo.url.cstr());				sock->writeLine("%s %s",PCX_HS_CHANNELID,idStr);			}
			if (outputProtocol == ChanInfo::SP_HTTP)			{				switch (chanInfo.contentType)				{					case ChanInfo::T_OGG:						sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XOGG);						break;					case ChanInfo::T_MP3:						sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MP3);						break;					case ChanInfo::T_MOV:						sock->writeLine("Connection: close");						sock->writeLine("Content-Length: 10000000");						sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MOV);						break;					case ChanInfo::T_MPG:						sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MPG);						break;					case ChanInfo::T_NSV:						sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_NSV);						break;					case ChanInfo::T_ASX:						sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_ASX);
						break;
					case ChanInfo::T_WMA:
						sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_WMA);
						break;
					case ChanInfo::T_WMV:						sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_WMV);						break;				}			} else if (outputProtocol == ChanInfo::SP_MMS)
			{
				sock->writeLine("Server: Rex/9.0.0.2980");
				sock->writeLine("Cache-Control: no-cache");
				sock->writeLine("Pragma: no-cache");
				sock->writeLine("Pragma: client-id=3587303426");
				sock->writeLine("Pragma: features=\"broadcast,playlist\"");

				if (nsSwitchNum)
				{
					sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_MMS);
				}else
				{
					sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
					if (ch)
						sock->writeLine("Content-Length: %d",ch->headPack.len);
					sock->writeLine("Connection: Keep-Alive");
				}
			
			} else if (outputProtocol == ChanInfo::SP_PCP)
			{
				sock->writeLine("%s %d",PCX_HS_POS,streamPos);
				sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
			}else if (outputProtocol == ChanInfo::SP_PEERCAST)			{				sock->writeLine("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);			}		}
		sock->writeLine("");
		result = true;

		if (gotPCP)
		{
			handshakeIncomingPCP(atom,rhost,remoteID,agent);
			atom.writeInt(PCP_OK,0);
		}

	}


	return result;}// -----------------------------------void Servent::handshakeGiv(GnuID &id){	if (id.isSet())
	{		char idstr[64];		id.toStr(idstr);		sock->writeLine("GIV /%s",idstr);
	}else		sock->writeLine("GIV");

	sock->writeLine("");
}// -----------------------------------void Servent::processGnutella(){
	type = T_PGNU;

	if (servMgr->isRoot && !servMgr->needConnections())
	{
		processRoot();
		return;
	}

	gnuStream.init(sock);	setStatus(S_CONNECTED);	gnuStream.ping(2);//	if (type != T_LOOKUP)//		chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2);	lastPacket = lastPing = sys->getTime();	bool doneBigPing=false;	const unsigned int	abortTimeoutSecs = 60;		// abort connection after 60 secs of no activitiy	const unsigned int	packetTimeoutSecs = 30;		// ping connection after 30 secs of no activity	unsigned int currBytes=0;	unsigned int lastWait=0;	unsigned int lastTotalIn=0,lastTotalOut=0;	while (thread.active && sock->active())	{		if (sock->readReady())		{			lastPacket = sys->getTime();			if (gnuStream.readPacket(pack))			{				unsigned int ver = pack.id.getVersion();				char ipstr[64];				sock->host.toStr(ipstr);				GnuID routeID;				GnuStream::R_TYPE ret = GnuStream::R_PROCESS;				if (ver < MIN_PACKETVER)					ret = GnuStream::R_BADVERSION;				if (pack.func != GNU_FUNC_PONG)					if (servMgr->seenPacket(pack))						ret = GnuStream::R_DUPLICATE;				seenIDs.addGnuID(pack.id);				servMgr->addVersion(ver);				if (ret == GnuStream::R_PROCESS)				{					GnuID routeID;					ret = gnuStream.processPacket(pack,this,routeID);					if (flowControl && (ret == GnuStream::R_BROADCAST))						ret = GnuStream::R_DROP;				}				switch(ret)				{					case GnuStream::R_BROADCAST:						if (servMgr->broadcast(pack,this))							stats.add(Stats::NUMBROADCASTED);						else							stats.add(Stats::NUMDROPPED);						break;					case GnuStream::R_ROUTE:						if (servMgr->route(pack,routeID,NULL))							stats.add(Stats::NUMROUTED);						else							stats.add(Stats::NUMDROPPED);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -