⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 servent.cpp

📁 P2P应用 : Peercast的源代码
💻 CPP
📖 第 1 页 / 共 4 页
字号:
	return 0;}// -----------------------------------void Servent::processServent(){	setStatus(S_HANDSHAKE);	handshakeIn();	if (!sock)		throw StreamException("Servent has no socket");
	processGnutella();}// -----------------------------------void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo){	
	if (!doneHandshake)
	{
		setStatus(S_HANDSHAKE);

		if (!handshakeStream(chanInfo))
			return;
	}

	if (chanInfo.id.isSet())
	{

		chanID = chanInfo.id;

		LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));

		if (!waitForChannelHeader(chanInfo))
			throw StreamException("Channel not ready");
		servMgr->totalStreams++;		Host host = sock->host;		host.port = 0;	// force to 0 so we ignore the incoming port

		Channel *ch = chanMgr->findChannelByID(chanID);
		if (!ch)
			throw StreamException("Channel not found");

		if (outputProtocol == ChanInfo::SP_HTTP)		{			if ((addMetadata) && (chanMgr->icyMetaInterval))				sendRawMetaChannel(chanMgr->icyMetaInterval);			else 
				sendRawChannel(true,true);

		}else if (outputProtocol == ChanInfo::SP_MMS)
		{
			if (nsSwitchNum)
			{
				sendRawChannel(true,true);
			}else
			{
				sendRawChannel(true,false);
			}

		}else if (outputProtocol  == ChanInfo::SP_PCP)		{			sendPCPChannel();

		} else if (outputProtocol  == ChanInfo::SP_PEERCAST)
		{
			sendPeercastChannel();
		}
	}
	setStatus(S_CLOSING);}// -----------------------------------------#if 0// debug		FileStream file;		file.openReadOnly("c://test.mp3");		LOG_DEBUG("raw file read");		char buf[4000];		int cnt=0;		while (!file.eof())		{			LOG_DEBUG("send %d",cnt++);			file.read(buf,sizeof(buf));			sock->write(buf,sizeof(buf));		}		file.close();		LOG_DEBUG("raw file sent");	return;// debug#endif// -----------------------------------bool Servent::waitForChannelHeader(ChanInfo &info){	for(int i=0; i<30*10; i++)	{
		Channel *ch = chanMgr->findChannelByID(info.id);
		if (!ch)
			return false;
		if (ch->isPlaying() && (ch->rawData.writePos>0))			return true;		if (!thread.active || !sock->active())			break;		sys->sleep(100);	}	return false;}// -----------------------------------void Servent::sendRawChannel(bool sendHead, bool sendData){
	try	{
		sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);

		Channel *ch = chanMgr->findChannelByID(chanID);
		if (!ch)
			throw StreamException("Channel not found");
		setStatus(S_CONNECTED);		LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);

		if (sendHead)
		{			ch->headPack.writeRaw(*sock);
			streamPos = ch->headPack.pos + ch->headPack.len;
			LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
		}

		if (sendData)
		{
			unsigned int streamIndex = ch->streamIndex;			unsigned int connectTime = sys->getTime();
			unsigned int lastWriteTime = connectTime;

			while ((thread.active) && sock->active())			{
				ch = chanMgr->findChannelByID(chanID);

				if (ch)
				{

					if (streamIndex != ch->streamIndex)
					{
						streamIndex = ch->streamIndex;
						streamPos = ch->headPack.pos;
						LOG_DEBUG("sendRaw got new stream index");
					}

					ChanPacket rawPack;
					if (ch->rawData.findPacket(streamPos,rawPack))
					{
						if (syncPos != rawPack.sync)
							LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
						syncPos = rawPack.sync+1;

						if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
						{
							rawPack.writeRaw(*sock);
							lastWriteTime = sys->getTime();
						}

						if (rawPack.pos < streamPos)
							LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
						streamPos = rawPack.pos+rawPack.len;
					}
				}

				if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
					throw TimeoutException();
				
				sys->sleepIdle();			}
		}	}catch(StreamException &e)	{		LOG_ERROR("Stream channel: %s",e.msg);	}}
#if 0
// -----------------------------------
void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
{
	try
	{
		unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
		unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
		GnuID chanIDs[ChanMgr::MAX_CHANNELS];
		int numChanIDs=0;
		for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
		{
			Channel *ch = &chanMgr->channels[i];
			if (ch->isPlaying())
				chanIDs[numChanIDs++]=ch->info.id;
		}



		setStatus(S_CONNECTED);


		if (sendHead)
		{
			for(int i=0; i<numChanIDs; i++)
			{
				Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
				if (ch)
				{
					LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
					ch->headPack.writeRaw(*sock);
					chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
					chanStreamIndex[i] = ch->streamIndex;
					LOG_DEBUG("Sent %d bytes header",ch->headPack.len);

				}
			}
		}

		if (sendData)
		{

			unsigned int connectTime=sys->getTime();

			while ((thread.active) && sock->active())
			{

				for(int i=1; i<numChanIDs; i++)
				{
					Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
					if (ch)
					{
						if (chanStreamIndex[i] != ch->streamIndex)
						{
							chanStreamIndex[i] = ch->streamIndex;
							chanStreamPos[i] = ch->headPack.pos;
							LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
						}

						ChanPacket rawPack;
						if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
						{
							if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
								rawPack.writeRaw(*sock);


							if (rawPack.pos < chanStreamPos[i])
								LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
							chanStreamPos[i] = rawPack.pos+rawPack.len;


							//LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
						}						
					}
					break;
				}
				

				sys->sleepIdle();
			}
		}
	}catch(StreamException &e)
	{
		LOG_ERROR("Stream channel: %s",e.msg);
	}
}
#endif
// -----------------------------------void Servent::sendRawMetaChannel(int interval){
	try	{
		Channel *ch = chanMgr->findChannelByID(chanID);
		if (!ch)
			throw StreamException("Channel not found");

		sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
		setStatus(S_CONNECTED);		LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos);		String lastTitle,lastURL;		int		lastMsgTime=sys->getTime();		bool	showMsg=true;		char buf[16384];		int bufPos=0;		if ((interval > sizeof(buf)) || (interval < 1))			throw StreamException("Bad ICY Meta Interval value");
		unsigned int connectTime = sys->getTime();		unsigned int lastWriteTime = connectTime;

		streamPos = 0;		// raw meta channel has no header (its MP3)
		while ((thread.active) && sock->active())		{
			ch = chanMgr->findChannelByID(chanID);

			if (ch)
			{

				ChanPacket rawPack;
				if (ch->rawData.findPacket(streamPos,rawPack))
				{

					if (syncPos != rawPack.sync)
						LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
					syncPos = rawPack.sync+1;

					MemoryStream mem(rawPack.data,rawPack.len);
					if (rawPack.type == ChanPacket::T_DATA)					{
						int len = rawPack.len;						char *p = rawPack.data;						while (len)						{							int rl = len;							if ((bufPos+rl) > interval)								rl = interval-bufPos;							memcpy(&buf[bufPos],p,rl);							bufPos+=rl;							p+=rl;							len-=rl;							if (bufPos >= interval)							{								bufPos = 0;									sock->write(buf,interval);								lastWriteTime = sys->getTime();
								if (chanMgr->broadcastMsgInterval)									if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)									{										showMsg ^= true;										lastMsgTime = sys->getTime();									}								String *metaTitle = &ch->info.track.title;								if (!ch->info.comment.isEmpty() && (showMsg))									metaTitle = &ch->info.comment;								if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))								{									char tmp[1024];									String title,url;									title = *metaTitle;									url = ch->info.url;									title.convertTo(String::T_META);									url.convertTo(String::T_META);									sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());									int len = ((strlen(tmp) + 15+1) / 16);									sock->writeChar(len);									sock->write(tmp,len*16);									lastTitle = *metaTitle;									lastURL = ch->info.url;									LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());								}else								{									sock->writeChar(0);													}							}
						}					}
					streamPos = rawPack.pos + rawPack.len;				}
			}
			if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
				throw TimeoutException();

			sys->sleepIdle();
		}	}catch(StreamException &e)	{		LOG_ERROR("Stream channel: %s",e.msg);	}}// -----------------------------------void Servent::sendPeercastChannel(){
	try	{		setStatus(S_CONNECTED);
		Channel *ch = chanMgr->findChannelByID(chanID);
		if (!ch)
			throw StreamException("Channel not found");
		LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());		sock->writeTag("PCST");		ChanPacket pack;

		ch->headPack.writePeercast(*sock);
		pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos);		pack.writePeercast(*sock);			streamPos = 0;
		unsigned int syncPos=0;		while ((thread.active) && sock->active())		{			ch = chanMgr->findChannelByID(chanID);
			if (ch)
			{

				ChanPacket rawPack;
				if (ch->rawData.findPacket(streamPos,rawPack))
				{
					if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
					{
						sock->writeTag("SYNC");
						sock->writeShort(4);
						sock->writeShort(0);
						sock->write(&syncPos,4);
						syncPos++;

						rawPack.writePeercast(*sock);
					}
					streamPos = rawPack.pos + rawPack.len;
				}
			}
			sys->sleepIdle();		}	}catch(StreamException &e)	{		LOG_ERROR("Stream channel: %s",e.msg);	}}
// -----------------------------------
void Servent::sendPCPChannel()
{
	Channel *ch = chanMgr->findChannelByID(chanID);
	if (!ch)
		throw StreamException("Channel not found");

	AtomStream atom(*sock);

	pcpStream = new PCPStream(remoteID);
	int error=0;

	try
	{

		LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);


		setStatus(S_CONNECTED);

		atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
			atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
			ch->info.writeInfoAtoms(atom);
			ch->info.writeTrackAtoms(atom);
			if (sendHeader)
			{
				atom.writeParent(PCP_CHAN_PKT,3);
					atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
					atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
					atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);

				streamPos = ch->headPack.pos+ch->headPack.len;
				LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
			}

		unsigned int streamIndex = ch->streamIndex;

		while (thread.active)
		{

			Channel *ch = chanMgr->findChannelByID(chanID);

			if (ch)
			{

				if (streamIndex != ch->streamIndex)
				{
					streamIndex = ch->streamIndex;
					streamPos = ch->headPack.pos;
					LOG_DEBUG("sendPCPStream got new stream index");						
				}

				ChanPacket rawPack;

				if (ch->rawData.findPacket(streamPos,rawPack))
				{

					if (rawPack.type == ChanPacket::T_HEAD)
					{
						atom.writeParent(PCP_CHAN,2);
							atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
							atom.writeParent(PCP_CHAN_PKT,3);
								atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
								atom.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
								atom.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);


					}else if (rawPack.type == ChanPacket::T_DATA)
					{
						atom.writeParent(PCP_CHAN,2);
							atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
							atom.writeParent(PCP_CHAN_PKT,3);
								atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
								atom.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
								atom.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);

					}

					if (rawPack.pos < streamPos)
						LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);

					//LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());

					streamPos = rawPack.pos+rawPack.len;
				}

			}
			BroadcastState bcs;
			error = pcpStream->readPacket(*sock,bcs);
			if (error)
				throw StreamException("PCP exception");

			sys->sleepIdle();

		}

		LOG_DEBUG("PCP channel stream closed normally.");

	}catch(StreamException &e)
	{
		LOG_ERROR("Stream channel: %s",e.msg);
	}

	try
	{
		atom.writeInt(PCP_QUIT,error);
	}catch(StreamException &) {}

}
// -----------------------------------int Servent::serverProc(ThreadInfo *thread){//	thread->lock();
	Servent *sv = (Servent*)thread->data;	try 	{		if (!sv->sock)			throw StreamException("Server has no socket");		sv->setStatus(S_LISTENING);		char servIP[64];		sv->sock->host.toStr(servIP);		if (servMgr->isRoot)			LOG_DEBUG("Root Server started: %s",servIP);		else			LOG_DEBUG("Server started: %s",servIP);				while ((thread->active) && (sv->sock->active()))		{
			if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
			{				ClientSocket *cs = sv->sock->accept();				if (cs)				{	
					LOG_DEBUG("accepted incoming");					Servent *ns = servMgr->allocServent();					if (ns)					{
						servMgr->lastIncoming = sys->getTime();
						ns->servPort = sv->sock->host.port;						ns->networkID = servMgr->networkID;						ns->initIncoming(cs,sv->allow);					}else						LOG_ERROR("Out of servents");				}
			}
			sys->sleep(100);
		}	}catch(StreamException &e)	{		LOG_ERROR("Server Error: %s:%d",e.msg,e.err);	}		LOG_DEBUG("Server stopped");
	sv->kill();
	sys->endThread(thread);	return 0;} // -----------------------------------
bool	Servent::writeVariable(Stream &s, const String &var)
{
	char buf[1024];

	if (var == "type")
		strcpy(buf,getTypeStr());
	else if (var == "status")
		strcpy(buf,getStatusStr());
	else if (var == "address")
		getHost().toStr(buf);
	else if (var == "agent")
		strcpy(buf,agent.cstr());
	else if (var == "bitrate")
	{
		if (sock)
		{
			unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
			sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
		}else
			strcpy(buf,"0");
	}else if (var == "uptime")
	{
		String uptime;
		if (lastConnect)
			uptime.setFromStopwatch(sys->getTime()-lastConnect);
		else
			uptime.set("-");
		strcpy(buf,uptime.cstr());
	}else if (var.startsWith("gnet."))
	{

		float ctime = (float)(sys->getTime()-lastConnect);
		if (var == "gnet.packetsIn")
			sprintf(buf,"%d",gnuStream.packetsIn);
		else if (var == "gnet.packetsInPerSec")
			sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
		else if (var == "gnet.packetsOut")
			sprintf(buf,"%d",gnuStream.packetsOut);
		else if (var == "gnet.packetsOutPerSec")
			sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
		else if (var == "gnet.normQueue")
			sprintf(buf,"%d",outPacketsNorm.numPending());
		else if (var == "gnet.priQueue")
			sprintf(buf,"%d",outPacketsPri.numPending());
		else if (var == "gnet.flowControl")
			sprintf(buf,"%d",flowControl?1:0);
		else if (var == "gnet.routeTime")
		{
			int nr = seenIDs.numUsed();
			unsigned int tim = sys->getTime()-seenIDs.getOldest();
		
			String tstr;
			tstr.setFromStopwatch(tim);

			if (nr)
				strcpy(buf,tstr.cstr());
			else
				strcpy(buf,"-");
		}
		else
			return false;

	}else
		return false;

	s.writeString(buf);

	return true;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -