⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 servent.cpp.svn-base

📁 这是和p2p相关的一份源码
💻 SVN-BASE
📖 第 1 页 / 共 4 页
字号:
		chanMgr->trackerHitList.deadHit(best);
		try
		{
			if (sv->sock)
			{
				sv->sock->close();
				delete sv->sock;
				sv->sock = NULL;
			}

		}catch(StreamException &) {}

		sys->sleepIdle();
	}	sv->kill();	return 0;}// -----------------------------------int Servent::incomingProc(ThreadInfo *thread){	thread->lock();	Servent *sv = (Servent*)thread->data;	
	char ipStr[64];
	sv->sock->host.toStr(ipStr);
	try 	{		sv->handshakeIncoming();	}catch(HTTPException &e)	{		try		{			sv->sock->writeLine(e.msg);			if (e.code == 401)				sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");			sv->sock->writeLine("");		}catch(StreamException &){}		LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);	}catch(StreamException &e)	{		LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);	}

	sv->kill();	return 0;}// -----------------------------------void Servent::processServent(){	setStatus(S_HANDSHAKE);	handshakeIn();	if (!sock)		throw StreamException("Servent has no socket");
	processGnutella();}// -----------------------------------void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo){	
	if (!doneHandshake)
	{
		setStatus(S_HANDSHAKE);

		if (!handshakeStream(chanInfo))
			return;
	}

	if (chanInfo.id.isSet())
	{

		chanID = chanInfo.id;

		LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));

		if (!waitForChannelHeader(chanInfo))
			throw StreamException("Channel not ready");
		servMgr->totalStreams++;		Host host = sock->host;		host.port = 0;	// force to 0 so we ignore the incoming port

		Channel *ch = chanMgr->findChannelByID(chanID);
		if (!ch)
			throw StreamException("Channel not found");

		if (outputProtocol == ChanInfo::SP_HTTP)		{			// we have agent string now, so check again.			if (!canPreview())				throw StreamException("Preview disallowed");			if ((addMetadata) && (chanMgr->icyMetaInterval))				sendRawMetaChannel(chanMgr->icyMetaInterval);			else 
				sendRawChannel(true,true);

		}else if (outputProtocol == ChanInfo::SP_MMS)
		{
			if (nsSwitchNum)
			{
				sendRawChannel(true,true);
			}else
			{
				sendRawChannel(true,false);
			}

		}else if (outputProtocol  == ChanInfo::SP_PCP)		{			sendPCPChannel();

		} else if (outputProtocol  == ChanInfo::SP_PEERCAST)
		{
			sendChannel(ch);
		}
	}
	setStatus(S_CLOSING);}// -----------------------------------------#if 0// debug		FileStream file;		file.openReadOnly("c://test.mp3");		LOG_DEBUG("raw file read");		char buf[4000];		int cnt=0;		while (!file.eof())		{			LOG_DEBUG("send %d",cnt++);			file.read(buf,sizeof(buf));			sock->write(buf,sizeof(buf));		}		file.close();		LOG_DEBUG("raw file sent");	return;// debug#endif// -----------------------------------bool Servent::waitForChannelHeader(ChanInfo &info){	for(int i=0; i<30*10; i++)	{
		Channel *ch = chanMgr->findChannelByID(info.id);
		if (!ch)
			return false;
		if (ch->isPlaying() && (ch->rawData.writePos>0))			return true;		if (!thread.active || !sock->active())			break;		sys->sleep(100);	}	return false;}// -----------------------------------bool Servent::checkPreview(unsigned int connectTime){	Host h = sock->host;	h.port = 0;			// ignore incoming port number	if (!servMgr->isFiltered(ServFilter::F_DIRECT,h))		return false;	if (isPrivate())	// always allow private clients		return true;	return false;}// -----------------------------------bool Servent::canPreview(){		// probably not needed, but just in case we bind to the actual IP address and not localhost	if (sock->host.ip == servMgr->serverHost.ip)		return true;	Host h = getHost();	h.port = 0;	if (!servMgr->isFiltered(ServFilter::F_DIRECT,h))		return false;	if (isPrivate())	// always allow private clients		return true;	if (agent.contains("PeerCast"))		// allow connections from peercast clients (direct relays etc..)		return true;	return true;}// -----------------------------------void Servent::sendRawChannel(bool sendHead, bool sendData){
	try	{
		Channel *ch = chanMgr->findChannelByID(chanID);
		if (!ch)
			throw StreamException("Channel not found");
		setStatus(S_CONNECTED);		LOG_DEBUG("Starting Raw stream: %s",ch->info.name.cstr());

		if (sendHead)
		{			ch->headPack.writeRaw(*sock);
			streamPos = ch->headPack.pos + ch->headPack.len;
			LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
		}

		if (sendData)
		{
			unsigned int connectTime=sys->getTime();
			unsigned int streamIndex = ch->streamIndex;
			while ((thread.active) && sock->active())			{				if (!checkPreview(connectTime))					throw StreamException("Preview time limit reached");
				ch = chanMgr->findChannelByID(chanID);

				if (ch)
				{

					if (streamIndex != ch->streamIndex)
					{
						streamIndex = ch->streamIndex;
						streamPos = ch->headPack.pos;
						LOG_DEBUG("sendRaw got new stream index");
					}

					ChanPacket rawPack;
					if (ch->rawData.findPacket(streamPos,rawPack))
					{
						if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))							rawPack.writeRaw(*sock);


						if (rawPack.pos < streamPos)
							LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
						streamPos = rawPack.pos+rawPack.len;

						//LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
					}
				}
				
				sys->sleepIdle();			}
		}	}catch(StreamException &e)	{		LOG_ERROR("Stream channel: %s",e.msg);	}}// -----------------------------------void Servent::sendRawMetaChannel(int interval){
	try	{
		Channel *ch = chanMgr->findChannelByID(chanID);
		if (!ch)
			throw StreamException("Channel not found");
		setStatus(S_CONNECTED);		LOG_DEBUG("Starting Raw Meta stream: %s (metaint: %d)",ch->info.name.cstr(),interval);		String lastTitle,lastURL;		int		lastMsgTime=sys->getTime();		bool	showMsg=true;		char buf[16384];		int bufPos=0;		if ((interval > sizeof(buf)) || (interval < 1))			throw StreamException("Bad ICY Meta Interval value");
		unsigned int connectTime = sys->getTime();
		streamPos = 0;		// raw meta channel has no header (its MP3)
		while ((thread.active) && sock->active())		{			if (!checkPreview(connectTime))				throw StreamException("Preview time limit reached");
			ch = chanMgr->findChannelByID(chanID);

			if (ch)
			{

				ChanPacket rawPack;
				if (ch->rawData.findPacket(streamPos,rawPack))
				{
					MemoryStream mem(rawPack.data,rawPack.len);
					if (rawPack.type == ChanPacket::T_DATA)					{
						int len = rawPack.len;						char *p = rawPack.data;						while (len)						{							int rl = len;							if ((bufPos+rl) > interval)								rl = interval-bufPos;							memcpy(&buf[bufPos],p,rl);							bufPos+=rl;							p+=rl;							len-=rl;							if (bufPos >= interval)							{								bufPos = 0;									sock->write(buf,interval);								if (chanMgr->broadcastMsgInterval)									if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)									{										showMsg ^= true;										lastMsgTime = sys->getTime();									}								String *metaTitle = &ch->info.track.title;								if (!ch->info.comment.isEmpty() && (showMsg))									metaTitle = &ch->info.comment;								if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))								{									char tmp[1024];									String title,url;									title = *metaTitle;									url = ch->info.url;									title.convertTo(String::T_META);									url.convertTo(String::T_META);									sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());									int len = ((strlen(tmp) + 15+1) / 16);									sock->writeChar(len);									sock->write(tmp,len*16);									lastTitle = *metaTitle;									lastURL = ch->info.url;									LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());								}else								{									sock->writeChar(0);													}							}
						}					}
					streamPos = rawPack.pos + rawPack.len;				}
			}
			sys->sleepIdle();
		}	}catch(StreamException &e)	{		LOG_ERROR("Stream channel: %s",e.msg);	}}// -----------------------------------void Servent::sendChannel(Channel *ch){
#if 0
	downData.accept = ChanPacket::T_HEAD|ChanPacket::T_DATA|ChanPacket::T_META;
	ch->numRelays++;
	
	try	{		sock->timeout = 10000;		setStatus(S_CONNECTED);		LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());		sock->writeTag("PCST");		ChanPacket pack;
		ch->headPack.write(*sock);
		pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->syncPos,ch->streamPos);		pack.write(*sock);				while ((thread.active) && sock->active())		{			if (!ch->isActive())				throw StreamException("Channel closed");			unsigned int np = downData.readPacket(syncPos,pack);			if ((np-syncPos) > 1)				LOG_DEBUG("sendChannel skip: %d",np-syncPos);			syncPos = np;			pack.write(*sock);		}	}catch(StreamException &e)	{		LOG_ERROR("Stream channel: %s",e.msg);	}	if (ch->numRelays)		ch->numRelays--;#endif
}
// -----------------------------------
void Servent::sendPCPChannel()
{
	Channel *ch = chanMgr->findChannelByID(chanID);
	if (!ch)
		throw StreamException("Channel not found");

	AtomStream atom(*sock);

	try
	{

		LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);


		setStatus(S_CONNECTED);

		atom.writeParent(PCP_CHAN,3 + ((streamPos==0)?1:0));
			atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
			ch->info.writeInfoAtoms(atom);
			ch->info.writeTrackAtoms(atom);
			if (streamPos == 0)
			{
				atom.writeParent(PCP_CHAN_PKT,3);
					atom.writeInt(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
					atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
					atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);

				streamPos = ch->headPack.pos+ch->headPack.len;
				LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
			}

		unsigned int streamIndex = ch->streamIndex;

		while (thread.active)
		{

			Channel *ch = chanMgr->findChannelByID(chanID);

			if (ch)
			{

				if (streamIndex != ch->streamIndex)
				{
					streamIndex = ch->streamIndex;
					streamPos = ch->headPack.pos;
					LOG_DEBUG("sendPCPStream got new stream index");
				}

				ChanPacket rawPack;

				if (ch->rawData.findPacket(streamPos,rawPack))
				{
					bool hasKey=rawPack.key.isSet();

					if (rawPack.type == ChanPacket::T_HEAD)
					{
						atom.writeParent(PCP_CHAN,2);
							atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
							atom.writeParent(PCP_CHAN_PKT,3 + (hasKey?1:0));
								atom.writeInt(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
								atom.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
								atom.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
								if (hasKey)
									atom.writeBytes(PCP_CHAN_PKT_KEY,rawPack.key.id,16);


					}else if (rawPack.type == ChanPacket::T_DATA)
					{
						atom.writeParent(PCP_CHAN,2);
							atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
							atom.writeParent(PCP_CHAN_PKT,3 + (hasKey?1:0));
								atom.writeInt(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
								atom.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
								atom.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
								if (hasKey)
									atom.writeBytes(PCP_CHAN_PKT_KEY,rawPack.key.id,16);

					}

					if (rawPack.pos < streamPos)
						LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);

					streamPos = rawPack.pos+rawPack.len;
				}

			}
			BroadcastState bcs;
			bcs.srcID = remoteID;
			pcpStream.readPacket(*sock,bcs);

			sys->sleepIdle();

		}

		LOG_DEBUG("PCP stream of channel closed.");

	}catch(StreamException &e)
	{
		LOG_ERROR("Stream channel: %s",e.msg);
	}

	try
	{
		atom.writeInt(PCP_QUIT,pcpStream.error);
	}catch(StreamException &) {}

}
// -----------------------------------int Servent::serverProc(ThreadInfo *thread){	thread->lock();	Servent *sv = (Servent*)thread->data;	try 	{		if (!sv->sock)			throw StreamException("Server has no socket");		sv->setStatus(S_LISTENING);		//LOG4("Listening on port %d",sv->sock->host.port);		char servIP[64];		sv->sock->host.toStr(servIP);		if (servMgr->isRoot)			LOG_DEBUG("Root Server started: %s",servIP);		else			LOG_DEBUG("Server started: %s",servIP);				while ((thread->active) && (sv->sock->active()))		{			ClientSocket *cs = sv->sock->accept();			if (cs)			{						Servent *ns = servMgr->allocServent();				if (ns)				{					ns->networkID = servMgr->networkID;					ns->initIncoming(cs,sv->allow);				}else					LOG_ERROR("Out of servents");			}		}	}catch(StreamException &e)	{		LOG_ERROR("Server Error: %s:%d",e.msg,e.err);	}		LOG_DEBUG("Server stopped");	sv->kill();	return 0;} 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -