⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp.svn-base

📁 这是和p2p相关的一份源码
💻 SVN-BASE
📖 第 1 页 / 共 5 页
字号:
				ch->sock = NULL;			}

			if (error == 404)
			{
				LOG_ERROR("Ch.%d not found",ch->index);
				return;
			}


		}
		ch->lastIdleTime = sys->getTime();
		ch->setStatus(Channel::S_IDLE);
		while ((ch->checkIdle()) && (ch->thread.active))
			sys->sleepIdle();

		sys->sleepIdle();	}}// -----------------------------------void	Channel::startICY(ClientSocket *cs, SRC_TYPE st){	srcType = st;	type = T_BROADCAST;	cs->setReadTimeout(0);	// stay connected even when theres no data coming through	sock = cs;	info.srcProtocol = ChanInfo::SP_HTTP;

	streamIndex = ++chanMgr->icyIndex;	sourceData = new ICYSource();	startStream();}// -----------------------------------static char *nextMetaPart(char *str,char delim){	while (*str)	{		if (*str == delim)		{			*str++ = 0;			return str;		}		str++;	}	return NULL;}// -----------------------------------static void copyStr(char *to,char *from,int max){	char c;	while ((c=*from++) && (--max))		if (c != '\'')			*to++ = c;	*to = 0;}// -----------------------------------void Channel::processMp3Metadata(char *str){	ChanInfo newInfo = info;
	
	char *cmd=str;	while (cmd)	{		char *arg = nextMetaPart(cmd,'=');		if (!arg)			break;		char *next = nextMetaPart(arg,';');		if (strcmp(cmd,"StreamTitle")==0)
		{			newInfo.track.title.setUnquote(arg,String::T_ASCII);
			newInfo.track.title.convertTo(String::T_UNICODE);
		}else if (strcmp(cmd,"StreamUrl")==0)
		{			newInfo.track.contact.setUnquote(arg,String::T_ASCII);
			newInfo.track.contact.convertTo(String::T_UNICODE);
		}		cmd = next;	}	updateInfo(newInfo);}// -----------------------------------XML::Node *ChanHit::createXML(){	// IP	char ipStr[64];	host.toStr(ipStr);		return new XML::Node("host ip=\"%s\" hops=\"%d\" listeners=\"%d\" relays=\"%d\" uptime=\"%d\" push=\"%d\" relay=\"%d\" direct=\"%d\" cin=\"%d\" stable=\"%d\" agent=\"%s\" version=\"%d\" update=\"%d\" tracker=\"%d\"",		ipStr,		numHops,		numListeners,		numRelays,
		upTime,		firewalled?1:0,		relay?1:0,		direct?1:0,
		cin?1:0,
		stable?1:0,		agentStr,
		version,		sys->getTime()-time,
		tracker
		);}// -----------------------------------XML::Node *ChanHitList::createHitsXML(){	XML::Node *hn = new XML::Node("hits listeners=\"%d\" hosts=\"%d\" firewalled=\"%d\" closest=\"%d\" furthest=\"%d\" newest=\"%d\"",		numListeners(),		numHits(),		numFirewalled(),		closestHit(),		furthestHit(),		sys->getTime()-newestHit()		);			for(int i=0; i<MAX_HITS; i++)		if (hits[i].host.ip)			hn->add(hits[i].createXML());	return hn;}// -----------------------------------XML::Node *Channel::createRelayXML(bool showStat){	const char *ststr;	ststr = getStatusStr();	if (!showStat)		if ((status == S_RECEIVING) || (status == S_BROADCASTING))			ststr = "OK";	ChanHitList *chl = chanMgr->findHitList(info);	return new XML::Node("relay listeners=\"%d\" relays=\"%d\" hosts=\"%d\" status=\"%s\"",		localListeners(),		localRelays(),		(chl!=NULL)?chl->numHits():0,		ststr		);	}// -----------------------------------void ChanMeta::fromXML(XML &xml){	MemoryStream tout(data,MAX_DATALEN);	xml.write(tout);	len = tout.pos;}// -----------------------------------void ChanMeta::fromMem(void *p, int l){	len = l;	memcpy(data,p,len);}// -----------------------------------void ChanMeta::addMem(void *p, int l){	if ((len+l) <= MAX_DATALEN)	{		memcpy(data+len,p,l);		len += l;	}}
// -----------------------------------
void Channel::broadcastTrackerUpdate(GnuID &svID, bool force)
{
	unsigned int ctime = sys->getTime();

	if (((ctime-lastTrackerUpdate) > 30) || (force))
	{
		ChanPacket pack;

		MemoryStream mem(pack.data,sizeof(pack));

		AtomStream atom(mem);
			
		ChanHit hit;

		ChanHitList *chl = chanMgr->findHitListByID(info.id);
		if (!chl)
			throw StreamException("Broadcast channel has no hitlist");

		int numListeners = localListeners();
		int numRelays = localRelays();

		hit.initLocal(numListeners,numRelays,info.numSkips,info.getUptime(),isPlaying());
		hit.tracker = true;

		atom.writeParent(PCP_BCST,7);
			atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ROOT);
			atom.writeChar(PCP_BCST_HOPS,0);
			atom.writeChar(PCP_BCST_TTL,7);
			atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
			atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
			atom.writeParent(PCP_CHAN,4);
				atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
				atom.writeBytes(PCP_CHAN_KEY,chanMgr->broadcastID.id,16);
				info.writeInfoAtoms(atom);
				info.writeTrackAtoms(atom);
			hit.writeAtoms(atom,false,info.id);


		pack.len = mem.pos;
		pack.type = ChanPacket::T_PCP;

		GnuID noID;
		noID.clear();
		int cnt = servMgr->broadcastPacket(pack,noID,servMgr->sessionID,svID,Servent::T_COUT);

		if (cnt)
		{
			LOG_DEBUG("Sent tracker update for %s to %d client(s)",info.name.cstr(),cnt);
			lastTrackerUpdate = ctime;
		}
	}
}

// -----------------------------------
bool	Channel::sendPacketUp(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did)
{
	if ( isActive() 
		&& (!cid.isSet() || info.id.isSame(cid)) 
		&& (!sid.isSet() || !remoteID.isSame(sid))
		&& sourceStream 
	   )
		return sourceStream->sendPacket(pack,did);

	return false;
}

// -----------------------------------void Channel::updateInfo(ChanInfo &newInfo){
	if (info.update(newInfo))
	{
		if (isBroadcasting())
		{
			unsigned int ctime = sys->getTime();
			if ((ctime-lastMetaUpdate) > 30)
			{
				lastMetaUpdate = ctime;

				ChanPacket pack;

				MemoryStream mem(pack.data,sizeof(pack));

				AtomStream atom(mem);

				atom.writeParent(PCP_BCST,7);
					atom.writeChar(PCP_BCST_HOPS,0);
					atom.writeChar(PCP_BCST_TTL,7);
					atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_RELAYS);
					atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
					atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
					atom.writeBytes(PCP_BCST_CHANID,info.id.id,16);
					atom.writeParent(PCP_CHAN,3);
						atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
						info.writeInfoAtoms(atom);
						info.writeTrackAtoms(atom);

				pack.len = mem.pos;
				pack.type = ChanPacket::T_PCP;
				GnuID noID;
				noID.clear();
				servMgr->broadcastPacket(pack,info.id,servMgr->sessionID,noID,Servent::T_RELAY);

				broadcastTrackerUpdate(noID);
			}
		}

	}

	ChanHitList *chl = chanMgr->findHitList(info);
	if (chl)
		chl->info = info;

	peercastApp->channelUpdate(&info);}// -----------------------------------ChannelStream *Channel::createSource(){//	if (servMgr->relayBroadcast)//		chanMgr->broadcastRelays(NULL,chanMgr->minBroadcastTTL,chanMgr->maxBroadcastTTL);	ChannelStream *source=NULL;	if (info.srcProtocol == ChanInfo::SP_PEERCAST)	{		LOG_CHANNEL("Ch.%d is Peercast",index);		source = new PeercastStream();	}	else if (info.srcProtocol == ChanInfo::SP_PCP)
	{
		LOG_CHANNEL("Ch.%d is PCP",index);
		PCPStream *pcp = new PCPStream(remoteID);
		source = pcp;
	}
	else if (info.srcProtocol == ChanInfo::SP_MMS)	{		LOG_CHANNEL("Ch.%d is MMS",index);		source = new MMSStream();	}else	{		switch(info.contentType)		{			case ChanInfo::T_MP3:				LOG_CHANNEL("Ch.%d is MP3 - meta: %d",index,icyMetaInterval);				source = new MP3Stream();				break;			case ChanInfo::T_NSV:				LOG_CHANNEL("Ch.%d is NSV",index);				source = new RawStream();				break;			case ChanInfo::T_WMA:			case ChanInfo::T_WMV:				throw StreamException("Ch.%d is WMA/WMV - but not MMS",index);				break;			case ChanInfo::T_OGG:				LOG_CHANNEL("Ch.%d is OGG",index);				source = new OGGStream();				break;			default:				LOG_CHANNEL("Ch.%d is Raw",index);				source = new RawStream();				break;		}	}	return source;}// -----------------------------------void ChannelStream::openHTTP(Channel *ch, const char *url){		ClientSocket *sock = sys->createSocket();	if (!sock)		throw StreamException("Cannot create socket");	String nextURL;	nextURL.set(url);	while(!nextURL.isEmpty())	{		char *fileName = nextURL.cstr();		char *dir = strstr(fileName,"/");		if (dir)			*dir++=0;		LOG_CHANNEL("Fetch Host=%s",fileName);		if (dir)			LOG_CHANNEL("Fetch Dir=%s",dir);		Host host;		host.fromStrName(fileName,80);		sock->open(host);		sock->connect();		HTTP http(*sock);		http.writeLine("GET /%s HTTP/1.1",dir?dir:"");		http.writeLine("%s %s",HTTP_HS_HOST,fileName);		http.writeLine("%s %s",HTTP_HS_CONNECTION,"close");		http.writeLine("%s %s",HTTP_HS_ACCEPT,"*/*");		http.writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT);  		http.writeLine("icy-metadata:1");		http.writeLine("");		int res = http.readResponse();		if ((res!=200) && (res!=302))		{			LOG_ERROR("HTTP response: %s",http.cmdLine);			throw StreamException("Bad HTTP connect");		}				nextURL.clear();		while (http.nextHeader())		{			LOG_CHANNEL("Fetch HTTP: %s",http.cmdLine);			Servent::readICYHeader(http,ch->info,NULL);			if (http.isHeader("icy-metaint"))				ch->icyMetaInterval = http.getArgInt();			else if (http.isHeader("Location:"))				nextURL.set(http.getArgStr());		}	}}
// ------------------------------------------
void ChannelStream::updateStatus(Channel *ch)
{
	ChanPacket pack;
	if (getStatus(ch,pack))
	{
		if (!ch->isBroadcasting())
		{
			GnuID noID;
			noID.clear();
			int cnt = chanMgr->broadcastPacketUp(pack,ch->info.id,servMgr->sessionID,noID);
			LOG_CHANNEL("Sent channel status update to %d clients",cnt);
		}
	}
}

// ------------------------------------------
bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
{
	unsigned int ctime = sys->getTime();

	ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);

	if (!chl)
		return false;

	int newLocalListeners = ch->localListeners();
	int newLocalRelays = ch->localRelays();

	if (
		(
		(numListeners != newLocalListeners) 
		|| (numRelays != newLocalRelays) 
		|| (ch->isPlaying() != isPlaying) 
		|| (servMgr->getFirewall() != fwState)
		|| (((ctime-lastUpdate)>chanMgr->hostUpdateInterval) && chanMgr->hostUpdateInterval)
		)
		&& ((ctime-lastUpdate) > 10)
	   )
	{

		numListeners = newLocalListeners;
		numRelays = newLocalRelays;
		isPlaying = ch->isPlaying();
		fwState = servMgr->getFirewall();
		lastUpdate = ctime;

		ChanHit hit;

		hit.initLocal(numListeners,numRelays,ch->info.numSkips,ch->info.getUptime(),isPlaying);
		hit.tracker = ch->isBroadcasting();

		MemoryStream pmem(pack.data,sizeof(pack.data));
		AtomStream atom(pmem);

		GnuID noID;
		noID.clear();

		atom.writeParent(PCP_BCST,7);
			atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_TRACKERS);
			atom.writeChar(PCP_BCST_HOPS,0);
			atom.writeChar(PCP_BCST_TTL,7);
			atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
			atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
			atom.writeBytes(PCP_BCST_CHANID,ch->info.id.id,16);
			hit.writeAtoms(atom,false,noID);

		pack.len = pmem.pos;
		pack.type = ChanPacket::T_PCP;
		return true;
	}else
		return false;
}
// -----------------------------------
bool	Channel::checkBump()
{
//	if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > 20))
//	{
//		LOG_ERROR("Ch.%d Auto bumped",index);
//		bump = true;
//	}
	
	if (bump)
	{
		bump = false;
		return true;
	}else
		return false;
}


// -----------------------------------int Channel::readStream(Stream &in,ChannelStream *source){
	int error = 0;

	info.numSkips = 0;

	source->readHeader(in,this);
	peercastApp->channelStart(&info);

	rawData.lastWriteTime = 0;

	bool wasBroadcasting=false;

	try
	{
		while (thread.active && !peercastInst->isQuitting)		{			
			error = source->readPacket(in,this);

			if (error)
				break;

			if (checkIdle())
			{
				LOG_DEBUG("Ch.%d idle",index);
				break;
			}

			if (checkBump())
			{
				LOG_DEBUG("Ch.%d bumped",index);
				break;
			}

			if (in.eof())
			{
				LOG_DEBUG("Ch.%d eof",index);
				break;
			}

			if (rawData.writePos > 0)
			{
				if (isBroadcasting())
				{					
					if ((sys->getTime()-lastTrackerUpdate) >= chanMgr->hostUpdateInterval)
					{
						GnuID noID;
						noID.clear();
						broadcastTrackerUpdate(noID);
					}
					wasBroadcasting = true;

				}else
				{
					setStatus(Channel::S_RECEIVING);
				}
				source->updateStatus(this);
			}

			sys->sleepIdle();		}
	}catch(StreamException &e)
	{
		LOG_ERROR("readStream: %s",e.msg);
		error = -1;
	}
	setStatus(S_CLOSING);

	if (wasBroadcasting)
	{
		GnuID noID;
		noID.clear();
		broadcastTrackerUpdate(noID,true);
	}

	peercastApp->channelStop(&info);
	source->readEnd(in,this);

	return error;
}// -----------------------------------void PeercastStream::readHeader(Stream &in,Channel *ch){	if (in.readTag() != 'PCST')		throw StreamException("Not PeerCast stream");}// -----------------------------------void PeercastStream::readEnd(Stream &,Channel *){}// -----------------------------------int PeercastStream::readPacket(Stream &in,Channel *ch){	ChanPacket pack;	{		pack.readPeercast(in);		MemoryStream mem(pack.data,pack.len);		switch(pack.type)		{			case ChanPacket::T_HEAD:
				// update sync pos
				ch->headPack = pack;
				pack.pos = ch->streamPos;
				ch->newPacket(pack);
				ch->streamPos+=pack.len;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -