⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp

📁 P2P应用 : Peercast的源代码
💻 CPP
📖 第 1 页 / 共 5 页
字号:
				ChannelStream *cs = ch->sourceStream;
				ch->sourceStream = NULL;
				cs->kill();
				delete cs;
			}

			if (ch->sock)			{				ch->sock->close();				delete ch->sock;				ch->sock = NULL;			}

			if (error == 404)
			{
				LOG_ERROR("Channel not found");
				return;
			}


		}
		ch->lastIdleTime = sys->getTime();
		ch->setStatus(Channel::S_IDLE);
		while ((ch->checkIdle()) && (ch->thread.active))
		{
			sys->sleepIdle();
		}

		sys->sleepIdle();	}}// -----------------------------------void	Channel::startICY(ClientSocket *cs, SRC_TYPE st){	srcType = st;	type = T_BROADCAST;	cs->setReadTimeout(0);	// stay connected even when theres no data coming through	sock = cs;	info.srcProtocol = ChanInfo::SP_HTTP;

	streamIndex = ++chanMgr->icyIndex;	sourceData = new ICYSource();	startStream();}// -----------------------------------static char *nextMetaPart(char *str,char delim){	while (*str)	{		if (*str == delim)		{			*str++ = 0;			return str;		}		str++;	}	return NULL;}// -----------------------------------static void copyStr(char *to,char *from,int max){	char c;	while ((c=*from++) && (--max))		if (c != '\'')			*to++ = c;	*to = 0;}// -----------------------------------void Channel::processMp3Metadata(char *str){	ChanInfo newInfo = info;
	
	char *cmd=str;	while (cmd)	{		char *arg = nextMetaPart(cmd,'=');		if (!arg)			break;		char *next = nextMetaPart(arg,';');		if (strcmp(cmd,"StreamTitle")==0)
		{			newInfo.track.title.setUnquote(arg,String::T_ASCII);
			newInfo.track.title.convertTo(String::T_UNICODE);
		}else if (strcmp(cmd,"StreamUrl")==0)
		{			newInfo.track.contact.setUnquote(arg,String::T_ASCII);
			newInfo.track.contact.convertTo(String::T_UNICODE);
		}		cmd = next;	}	updateInfo(newInfo);}// -----------------------------------XML::Node *ChanHit::createXML(){	// IP	char ipStr[64];	host.toStr(ipStr);		return new XML::Node("host ip=\"%s\" hops=\"%d\" listeners=\"%d\" relays=\"%d\" uptime=\"%d\" push=\"%d\" relay=\"%d\" direct=\"%d\" cin=\"%d\" stable=\"%d\" version=\"%d\" update=\"%d\" tracker=\"%d\"",		ipStr,		numHops,		numListeners,		numRelays,
		upTime,		firewalled?1:0,		relay?1:0,		direct?1:0,
		cin?1:0,
		stable?1:0,		version,		sys->getTime()-time,
		tracker
		);}// -----------------------------------XML::Node *ChanHitList::createXML(bool addHits){	XML::Node *hn = new XML::Node("hits hosts=\"%d\" listeners=\"%d\" relays=\"%d\" firewalled=\"%d\" closest=\"%d\" furthest=\"%d\" newest=\"%d\"",		numHits(),
		numListeners(),		numRelays(),		numFirewalled(),		closestHit(),		furthestHit(),		sys->getTime()-newestHit()		);			if (addHits)
	{
		ChanHit *h = hit;
		while (h)
		{			if (h->host.ip)				hn->add(h->createXML());
			h = h->next;
		}
	}	return hn;}// -----------------------------------XML::Node *Channel::createRelayXML(bool showStat){	const char *ststr;	ststr = getStatusStr();	if (!showStat)		if ((status == S_RECEIVING) || (status == S_BROADCASTING))			ststr = "OK";	ChanHitList *chl = chanMgr->findHitList(info);	return new XML::Node("relay listeners=\"%d\" relays=\"%d\" hosts=\"%d\" status=\"%s\"",		localListeners(),		localRelays(),		(chl!=NULL)?chl->numHits():0,		ststr		);	}// -----------------------------------void ChanMeta::fromXML(XML &xml){	MemoryStream tout(data,MAX_DATALEN);	xml.write(tout);	len = tout.pos;}// -----------------------------------void ChanMeta::fromMem(void *p, int l){	len = l;	memcpy(data,p,len);}// -----------------------------------void ChanMeta::addMem(void *p, int l){	if ((len+l) <= MAX_DATALEN)	{		memcpy(data+len,p,l);		len += l;	}}
// -----------------------------------
void Channel::broadcastTrackerUpdate(GnuID &svID, bool force)
{
	unsigned int ctime = sys->getTime();

	if (((ctime-lastTrackerUpdate) > 30) || (force))
	{
		ChanPacket pack;

		MemoryStream mem(pack.data,sizeof(pack));

		AtomStream atom(mem);
			
		ChanHit hit;

		ChanHitList *chl = chanMgr->findHitListByID(info.id);
		if (!chl)
			throw StreamException("Broadcast channel has no hitlist");

		int numListeners = totalListeners();
		int numRelays = totalRelays();

		unsigned int oldp = rawData.getOldestPos();
		unsigned int newp = rawData.getLatestPos();

		hit.initLocal(numListeners,numRelays,info.numSkips,info.getUptime(),isPlaying(),oldp,newp);
		hit.tracker = true;

		atom.writeParent(PCP_BCST,7);
			atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ROOT);
			atom.writeChar(PCP_BCST_HOPS,0);
			atom.writeChar(PCP_BCST_TTL,7);
			atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
			atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
			atom.writeParent(PCP_CHAN,4);
				atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
				atom.writeBytes(PCP_CHAN_BCID,chanMgr->broadcastID.id,16);
				info.writeInfoAtoms(atom);
				info.writeTrackAtoms(atom);
			hit.writeAtoms(atom,info.id);


		pack.len = mem.pos;
		pack.type = ChanPacket::T_PCP;

		GnuID noID;
		noID.clear();
		int cnt = servMgr->broadcastPacket(pack,noID,servMgr->sessionID,svID,Servent::T_COUT);

		if (cnt)
		{
			LOG_DEBUG("Sent tracker update for %s to %d client(s)",info.name.cstr(),cnt);
			lastTrackerUpdate = ctime;
		}
	}
}

// -----------------------------------
bool	Channel::sendPacketUp(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did)
{
	if ( isActive() 
		&& (!cid.isSet() || info.id.isSame(cid)) 
		&& (!sid.isSet() || !remoteID.isSame(sid))
		&& sourceStream 
	   )
		return sourceStream->sendPacket(pack,did);

	return false;
}

// -----------------------------------void Channel::updateInfo(ChanInfo &newInfo){
	if (info.update(newInfo))
	{
		if (isBroadcasting())
		{
			unsigned int ctime = sys->getTime();
			if ((ctime-lastMetaUpdate) > 30)
			{
				lastMetaUpdate = ctime;

				ChanPacket pack;

				MemoryStream mem(pack.data,sizeof(pack));

				AtomStream atom(mem);

				atom.writeParent(PCP_BCST,7);
					atom.writeChar(PCP_BCST_HOPS,0);
					atom.writeChar(PCP_BCST_TTL,7);
					atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_RELAYS);
					atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
					atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
					atom.writeBytes(PCP_BCST_CHANID,info.id.id,16);
					atom.writeParent(PCP_CHAN,3);
						atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
						info.writeInfoAtoms(atom);
						info.writeTrackAtoms(atom);

				pack.len = mem.pos;
				pack.type = ChanPacket::T_PCP;
				GnuID noID;
				noID.clear();
				servMgr->broadcastPacket(pack,info.id,servMgr->sessionID,noID,Servent::T_RELAY);

				broadcastTrackerUpdate(noID);
			}
		}

		ChanHitList *chl = chanMgr->findHitList(info);
		if (chl)
			chl->info = info;

		peercastApp->channelUpdate(&info);

	}

}// -----------------------------------ChannelStream *Channel::createSource(){//	if (servMgr->relayBroadcast)//		chanMgr->broadcastRelays(NULL,chanMgr->minBroadcastTTL,chanMgr->maxBroadcastTTL);	ChannelStream *source=NULL;	if (info.srcProtocol == ChanInfo::SP_PEERCAST)	{		LOG_CHANNEL("Channel is Peercast");		source = new PeercastStream();	}	else if (info.srcProtocol == ChanInfo::SP_PCP)
	{
		LOG_CHANNEL("Channel is PCP");
		PCPStream *pcp = new PCPStream(remoteID);
		source = pcp;
	}
	else if (info.srcProtocol == ChanInfo::SP_MMS)	{		LOG_CHANNEL("Channel is MMS");		source = new MMSStream();	}else	{		switch(info.contentType)		{			case ChanInfo::T_MP3:				LOG_CHANNEL("Channel is MP3 - meta: %d",icyMetaInterval);				source = new MP3Stream();				break;			case ChanInfo::T_NSV:				LOG_CHANNEL("Channel is NSV");				source = new NSVStream();				break;			case ChanInfo::T_WMA:			case ChanInfo::T_WMV:				throw StreamException("Channel is WMA/WMV - but not MMS");				break;			case ChanInfo::T_OGG:			case ChanInfo::T_OGM:
				LOG_CHANNEL("Channel is OGG");				source = new OGGStream();				break;			default:				LOG_CHANNEL("Channel is Raw");				source = new RawStream();				break;		}	}	return source;}// ------------------------------------------
void ChannelStream::updateStatus(Channel *ch)
{
	ChanPacket pack;
	if (getStatus(ch,pack))
	{
		if (!ch->isBroadcasting())
		{
			GnuID noID;
			noID.clear();
			int cnt = chanMgr->broadcastPacketUp(pack,ch->info.id,servMgr->sessionID,noID);
			LOG_CHANNEL("Sent channel status update to %d clients",cnt);
		}
	}
}

// ------------------------------------------
bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
{
	unsigned int ctime = sys->getTime();

	ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);

	if (!chl)
		return false;

	int newLocalListeners = ch->localListeners();
	int newLocalRelays = ch->localRelays();

	if (
		(
		(numListeners != newLocalListeners) 
		|| (numRelays != newLocalRelays) 
		|| (ch->isPlaying() != isPlaying) 
		|| (servMgr->getFirewall() != fwState)
		|| (((ctime-lastUpdate)>chanMgr->hostUpdateInterval) && chanMgr->hostUpdateInterval)
		)
		&& ((ctime-lastUpdate) > 10)
	   )
	{

		numListeners = newLocalListeners;
		numRelays = newLocalRelays;
		isPlaying = ch->isPlaying();
		fwState = servMgr->getFirewall();
		lastUpdate = ctime;

		ChanHit hit;

		unsigned int oldp = ch->rawData.getOldestPos();
		unsigned int newp = ch->rawData.getLatestPos();

		hit.initLocal(numListeners,numRelays,ch->info.numSkips,ch->info.getUptime(),isPlaying,oldp,newp);
		hit.tracker = ch->isBroadcasting();

		MemoryStream pmem(pack.data,sizeof(pack.data));
		AtomStream atom(pmem);

		GnuID noID;
		noID.clear();

		atom.writeParent(PCP_BCST,7);
			atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_TRACKERS);
			atom.writeChar(PCP_BCST_HOPS,0);
			atom.writeChar(PCP_BCST_TTL,11);
			atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
			atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
			atom.writeBytes(PCP_BCST_CHANID,ch->info.id.id,16);
			hit.writeAtoms(atom,noID);

		pack.len = pmem.pos;
		pack.type = ChanPacket::T_PCP;
		return true;
	}else
		return false;
}
// -----------------------------------
bool	Channel::checkBump()
{
	if (!isBroadcasting() && (!sourceHost.tracker))
		if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > 30))
		{
			LOG_ERROR("Channel Auto bumped");
			bump = true;
		}
	
	if (bump)
	{
		bump = false;
		return true;
	}else
		return false;
}


// -----------------------------------int Channel::readStream(Stream &in,ChannelStream *source){
	int error = 0;

	info.numSkips = 0;

	source->readHeader(in,this);
	peercastApp->channelStart(&info);

	rawData.lastWriteTime = 0;

	bool wasBroadcasting=false;

	try
	{
		while (thread.active && !peercastInst->isQuitting)		{			
			if (checkIdle())
			{
				LOG_DEBUG("Channel idle");
				break;
			}

			if (checkBump())
			{
				LOG_DEBUG("Channel bumped");
				error = -1;
				break;
			}

			if (in.eof())
			{
				LOG_DEBUG("Channel eof");
				break;
			}

			if (in.readReady())
			{
				error = source->readPacket(in,this);

				if (error)
					break;

				if (rawData.writePos > 0)
				{
					if (isBroadcasting())
					{					
						if ((sys->getTime()-lastTrackerUpdate) >= chanMgr->hostUpdateInterval)
						{
							GnuID noID;
							noID.clear();
							broadcastTrackerUpdate(noID);
						}
						wasBroadcasting = true;

					}else
					{
						setStatus(Channel::S_RECEIVING);
					}
					source->updateStatus(this);
				}
			}
			
			sys->sleepIdle();		}
	}catch(StreamException &e)
	{
		LOG_ERROR("readStream: %s",e.msg);
		error = -1;
	}
	setStatus(S_CLOSING);

	if (wasBroadcasting)
	{
		GnuID noID;
		noID.clear();
		broadcastTrackerUpdate(noID,true);
	}

	peercastApp->channelStop(&info);
	source->readEnd(in,this);

	return error;
}// -----------------------------------void PeercastStream::readHeader(Stream &in,Channel *ch){	if (in.readTag() != 'PCST')		throw StreamException("Not PeerCast stream");}// -----------------------------------void PeercastStream::readEnd(Stream &,Channel *){}// -----------------------------------int PeercastStream::readPacket(Stream &in,Channel *ch){	ChanPacket pack;	{		pack.readPeercast(in);		MemoryStream mem(pack.data,pack.len);		switch(pack.type)		{			case ChanPacket::T_HEAD:
				// update sync pos
				ch->headPack = pack;
				pack.pos = ch->streamPos;
				ch->newPacket(pack);
				ch->streamPos+=pack.len;				break;			case ChanPacket::T_DATA:
				pack.pos = ch->streamPos;
				ch->newPacket(pack);
				ch->streamPos+=pack.len;
				break;
			case ChanPacket::T_META:				ch->insertMeta.fromMem(pack.data,pack.len);				{					if (pack.len)					{						XML xml;						xml.read(mem);						XML::Node *n = xml.findNode("channel");											if (n)						{
							ChanInfo newInfo = ch->info;							newInfo.updateFromXML(n);							ChanHitList *chl = chanMgr->findHitList(ch->info);							if (chl)								newInfo.updateFromXML(n);							ch->updateInfo(newInfo);
						}					}				}
				break;#if 0
			case ChanPacket::T_SYNC:				{					unsigned int s = mem.readLong();					if ((s-ch->syncPos) != 1)					{						LOG_CHANNEL("Ch.%d SKIP: %d to %d (%d)",ch->index,ch->syncPos,s,ch->info.numSkips);						if (ch->syncPos)						{							ch->info.numSkips++;							if (ch->info.numSkips>50)								throw StreamException("Bumped - Too many skips");						}					}					ch->syncPos = s;				}				break;
#endif
		}
	}
	return 0;}// -----------------------------------void ChannelStream::readRaw(Stream &in, Channel *ch){	ChanPacket pack;	const int readLen = 8192;
	pack.init(ChanPacket::T_DATA,pack.data,readLen,ch->streamPos);	in.read(pack.data,pack.len);	ch->newPacket(pack);
	ch->checkReadDelay(pack.len);

	ch->streamPos+=pack.len;}
// ------------------------------------------

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -