⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp.svn-base

📁 这是和p2p相关的一份源码
💻 SVN-BASE
📖 第 1 页 / 共 5 页
字号:
	// IP	char ipStr[64];	host.toStr(ipStr);		return new XML::Node("host ip=\"%s\" hops=\"%d\" listeners=\"%d\" uptime=\"%d\" skips=\"%d\" push=\"%d\" busy=\"%d\" stable=\"%d\" agent=\"%s\" update=\"%d\" tracker=\"%d\" relays=\"%d\"",		ipStr,		hops,		numListeners,		upTime,		numSkips,		firewalled?1:0,		busy?1:0,		stable?1:0,		agentStr,		sys->getTime()-time,
		tracker,
		numRelays		);}// -----------------------------------XML::Node *ChanHitList::createHitsXML(){	XML::Node *hn = new XML::Node("hits listeners=\"%d\" hosts=\"%d\" busy=\"%d\" stable=\"%d\" firewalled=\"%d\" closest=\"%d\" furthest=\"%d\" newest=\"%d\" ",		numListeners(),		numHits(),		numBusy(),		numStable(),		numFirewalled(),		closestHit(),		furthestHit(),		sys->getTime()-newestHit()		);			for(int i=0; i<MAX_HITS; i++)		if (hits[i].host.ip)			hn->add(hits[i].createXML());	return hn;}// -----------------------------------XML::Node *Channel::createRelayXML(bool showStat){	const char *ststr;	ststr = getStatusStr();	if (!showStat)		if ((status == S_RECEIVING) || (status == S_BROADCASTING))			ststr = "OK";	ChanHitList *chl = chanMgr->findHitList(info);	return new XML::Node("relay listeners=\"%d\" relays=\"%d\" hosts=\"%d\" status=\"%s\"",		numListeners(),		numRelays(),		(chl!=NULL)?chl->numHits():0,		ststr		);	}// -----------------------------------void ChanMeta::fromXML(XML &xml){	MemoryStream tout(data,MAX_DATALEN);	xml.write(tout);	len = tout.pos;}// -----------------------------------void ChanMeta::fromMem(void *p, int l){	len = l;	memcpy(data,p,len);}// -----------------------------------void ChanMeta::addMem(void *p, int l){	if ((len+l) <= MAX_DATALEN)	{		memcpy(data+len,p,l);		len += l;	}}
// -----------------------------------
void Channel::broadcastTrackerUpdate(GnuID &svID, bool force)
{
	unsigned int ctime = sys->getTime();

	if (((ctime-lastTrackerUpdate) > 30) || (force))
	{
		ChanPacket pack;

		MemoryStream mem(pack.data,sizeof(pack));

		AtomStream atom(mem);
			
		ChanHit hit;

		ChanHitList *chl = chanMgr->findHitListByID(info.id);
		if (!chl)
			throw StreamException("Broadcast channel has no hitlist");

		int totListeners = chl->getTotalListeners() + numListeners();
		int totRelays = chl->getTotalRelays() + numRelays();

		hit.initLocal(totListeners,totRelays,info.numSkips,info.getUptime(),isPlaying(),servMgr->controlInFull());
		hit.tracker = true;

		atom.writeParent(PCP_BCST,5);
			atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ROOT);
			atom.writeChar(PCP_BCST_HOPS,0);
			atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
			atom.writeParent(PCP_CHAN,4);
				atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
				atom.writeBytes(PCP_CHAN_KEY,chanMgr->broadcastID.id,16);
				info.writeInfoAtoms(atom);
				info.writeTrackAtoms(atom);
			hit.writeAtoms(atom,false,info.id);


		pack.len = mem.pos;
		pack.type = ChanPacket::T_PCP;

		GnuID noID;
		noID.clear();
		int cnt = servMgr->broadcastPacket(pack,noID,servMgr->sessionID,svID,Servent::T_COUT);

		if (cnt)
		{
			LOG_DEBUG("Sent tracker update to %d client(s)",cnt);
			lastTrackerUpdate = ctime;
		}
	}
}

// -----------------------------------
bool	Channel::sendPacketUp(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did)
{
	if ( isActive() 
		&& (!cid.isSet() || info.id.isSame(cid)) 
		&& (!did.isSet() || remoteID.isSame(did))
		&& (!sid.isSet() || !remoteID.isSame(sid))
		&& sourceStream 
	   )
		return sourceStream->sendPacket(pack);

	return false;
}

// -----------------------------------void Channel::updateMeta(){
	if (isBroadcasting())
	{
		unsigned int ctime = sys->getTime();
		if ((ctime-lastMetaUpdate) > 30)
		{
			lastMetaUpdate = ctime;

			ChanPacket pack;

			MemoryStream mem(pack.data,sizeof(pack));

			AtomStream atom(mem);

			atom.writeParent(PCP_BCST,3);
				atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_RELAYS);
				atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
				atom.writeParent(PCP_CHAN,3);
					atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
					info.writeInfoAtoms(atom);
					info.writeTrackAtoms(atom);

			pack.len = mem.pos;
			pack.type = ChanPacket::T_PCP;
			GnuID noID;
			noID.clear();
			servMgr->broadcastPacket(pack,info.id,servMgr->sessionID,noID,Servent::T_STREAM);

			broadcastTrackerUpdate(noID);
		}

	}

	ChanHitList *chl = chanMgr->findHitList(info);
	if (chl)
		chl->info = info;

	peercastApp->channelUpdate(&info);}// -----------------------------------ChannelStream *Channel::createSource(){//	if (servMgr->relayBroadcast)//		chanMgr->broadcastRelays(NULL,chanMgr->minBroadcastTTL,chanMgr->maxBroadcastTTL);	ChannelStream *source=NULL;	if (info.srcProtocol == ChanInfo::SP_PEERCAST)	{		LOG_CHANNEL("Ch.%d is Peercast",index);		source = new PeercastStream();	}	else if (info.srcProtocol == ChanInfo::SP_PCP)
	{
		LOG_CHANNEL("Ch.%d is PCP",index);
		PCPStream *pcp = new PCPStream();
		source = pcp;
	}
	else if (info.srcProtocol == ChanInfo::SP_MMS)	{		LOG_CHANNEL("Ch.%d is MMS",index);		source = new MMSStream();	}else	{		switch(info.contentType)		{			case ChanInfo::T_MP3:				LOG_CHANNEL("Ch.%d is MP3 - meta: %d",index,icyMetaInterval);				source = new MP3Stream();				break;			case ChanInfo::T_NSV:				LOG_CHANNEL("Ch.%d is NSV",index);				source = new RawStream();				break;			case ChanInfo::T_WMA:			case ChanInfo::T_WMV:				throw StreamException("Ch.%d is WMA/WMV - but not MMS",index);				break;			case ChanInfo::T_OGG:				LOG_CHANNEL("Ch.%d is OGG",index);				source = new OGGStream();				break;			default:				LOG_CHANNEL("Ch.%d is Raw",index);				source = new RawStream();				break;		}	}	return source;}// -----------------------------------void ChannelStream::openHTTP(Channel *ch, const char *url){		ClientSocket *sock = sys->createSocket();	if (!sock)		throw StreamException("Cannot create socket");	String nextURL;	nextURL.set(url);	while(!nextURL.isEmpty())	{		char *fileName = nextURL.cstr();		char *dir = strstr(fileName,"/");		if (dir)			*dir++=0;		LOG_CHANNEL("Fetch Host=%s",fileName);		if (dir)			LOG_CHANNEL("Fetch Dir=%s",dir);		Host host;		host.fromStrName(fileName,80);		sock->open(host);		sock->connect();		HTTP http(*sock);		http.writeLine("GET /%s HTTP/1.1",dir?dir:"");		http.writeLine("%s %s",HTTP_HS_HOST,fileName);		http.writeLine("%s %s",HTTP_HS_CONNECTION,"close");		http.writeLine("%s %s",HTTP_HS_ACCEPT,"*/*");		http.writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT);  		http.writeLine("icy-metadata:1");		http.writeLine("");		int res = http.readResponse();		if ((res!=200) && (res!=302))		{			LOG_ERROR("HTTP response: %s",http.cmdLine);			throw StreamException("Bad HTTP connect");		}				nextURL.clear();		while (http.nextHeader())		{			LOG_CHANNEL("Fetch HTTP: %s",http.cmdLine);			Servent::readICYHeader(http,ch->info,NULL,false);			if (http.isHeader("icy-metaint"))				ch->icyMetaInterval = http.getArgInt();			else if (http.isHeader("Location:"))				nextURL.set(http.getArgStr());		}	}}
// ------------------------------------------
void ChannelStream::updateStatus(Channel *ch)
{
	ChanPacket pack;
	if (getStatus(ch,pack))
	{
		if (!ch->isBroadcasting())
		{
			GnuID noID;
			noID.clear();
			int cnt = chanMgr->broadcastPacketUp(pack,ch->info.id,servMgr->sessionID,noID);
			LOG_CHANNEL("Sent channel status update to %d clients",cnt);
		}
	}
}

// ------------------------------------------
bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
{
	unsigned int ctime = sys->getTime();

	ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);

	if (!chl)
		return false;

	int newNumListeners = ch->numListeners();
	int newNumRelays = ch->numRelays();

	if (
		(
		(numListeners != newNumListeners) 
		|| (numRelays != newNumRelays) 
		|| (ch->isPlaying() != isPlaying) 
		|| (servMgr->getFirewall() != fwState)
		|| (((ctime-lastUpdate)>chanMgr->hostUpdateInterval) && chanMgr->hostUpdateInterval)
		)
		&& ((ctime-lastUpdate) > 10)
	   )
	{

		numListeners = newNumListeners;
		numRelays = newNumRelays;
		isPlaying = ch->isPlaying();
		fwState = servMgr->getFirewall();
		lastUpdate = ctime;

		ChanHit hit;

		hit.initLocal(numListeners,numRelays,ch->info.numSkips,ch->info.getUptime(),isPlaying,servMgr->streamFull());
		hit.tracker = ch->isBroadcasting();

		MemoryStream pmem(pack.data,sizeof(pack.data));
		AtomStream atom(pmem);

		GnuID noID;
		noID.clear();

		atom.writeParent(PCP_BCST,5);
			atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_TRACKERS);
			atom.writeChar(PCP_BCST_HOPS,0);
			atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
			atom.writeBytes(PCP_BCST_CHANID,ch->info.id.id,16);
			hit.writeAtoms(atom,false,noID);

		pack.len = pmem.pos;
		pack.type = ChanPacket::T_PCP;
		return true;
	}else
		return false;
}
// -----------------------------------
bool	Channel::checkBump()
{
	if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > 20))
		bump = true;
	
	if (bump)
	{
		bump = false;
		return true;
	}else
		return false;
}


// -----------------------------------void Channel::readStream(Stream &in,ChannelStream *source){
	info.numSkips = 0;

	source->readHeader(in,this);
	peercastApp->channelStart(&info);

	rawData.lastWriteTime = 0;

	bool wasBroadcasting=false;

	try
	{
		while ((!in.eof() && (thread.active) && !peercastInst->isQuitting))		{			
			source->readPacket(in,this);
			if (checkBump())				throw StreamException("Bumped");			if (checkIdle())				break;

			if (rawData.writePos > 0)
			{
				if (isBroadcasting())
				{					
					if ((sys->getTime()-lastTrackerUpdate) >= chanMgr->hostUpdateInterval)
					{
						GnuID noID;
						noID.clear();
						broadcastTrackerUpdate(noID);
					}
					wasBroadcasting = true;

				}else
				{
					setStatus(Channel::S_RECEIVING);
				}
				source->updateStatus(this);
			}

			sys->sleepIdle();		}
	}catch(StreamException &e)
	{
		LOG_ERROR("readStream error: %s",e.msg);
	}
	setStatus(S_CLOSING);

	if (wasBroadcasting)
	{
		GnuID noID;
		noID.clear();
		broadcastTrackerUpdate(noID,true);
	}


	peercastApp->channelStop(&info);
	source->readEnd(in,this);
}// -----------------------------------void PeercastStream::readHeader(Stream &in,Channel *ch){	if (in.readTag() != 'PCST')		throw StreamException("Not PeerCast stream");}// -----------------------------------void PeercastStream::readEnd(Stream &,Channel *){}// -----------------------------------void PeercastStream::readPacket(Stream &in,Channel *ch){	ChanPacket pack;	{		pack.readPeercast(in);		MemoryStream mem(pack.data,pack.len);		switch(pack.type)		{			case ChanPacket::T_HEAD:
				// update sync pos
				ch->headPack = pack;
				pack.pos = ch->streamPos;
				ch->newPacket(pack);
				ch->streamPos+=pack.len;				break;			case ChanPacket::T_DATA:
				pack.pos = ch->streamPos;
				ch->newPacket(pack);
				ch->streamPos+=pack.len;
				break;
			case ChanPacket::T_META:				ch->insertMeta.fromMem(pack.data,pack.len);				{					if (pack.len)					{						XML xml;						xml.read(mem);						XML::Node *n = xml.findNode("channel");											if (n)						{							ch->info.updateFromXML(n);							ChanHitList *chl = chanMgr->findHitList(ch->info);							if (chl)								chl->info.updateFromXML(n);						}					}				}
				ch->updateMeta();				break;#if 0
			case ChanPacket::T_SYNC:				{					unsigned int s = mem.readLong();					if ((s-ch->syncPos) != 1)					{						LOG_CHANNEL("Ch.%d SKIP: %d to %d (%d)",ch->index,ch->syncPos,s,ch->info.numSkips);						if (ch->syncPos)						{							ch->info.numSkips++;							if (ch->info.numSkips>50)								throw StreamException("Bumped - Too many skips");						}					}					ch->syncPos = s;				}				break;
#endif
		}
	}}// -----------------------------------void ChannelStream::readRaw(Stream &in, Channel *ch){	ChanPacket pack;	const int readLen = 8192;
	pack.init(ChanPacket::T_DATA,pack.data,readLen,ch->streamPos);	in.read(pack.data,pack.len);	ch->newPacket(pack);
	ch->checkReadDelay(pack.len);

	ch->streamPos+=pack.len;}
// ------------------------------------------
void RawStream::readHeader(Stream &,Channel *)
{
}

// ------------------------------------------
void RawStream::readPacket(Stream &in,Channel *ch)
{
	readRaw(in,ch);
}

// ------------------------------------------
void RawStream::readEnd(Stream &,Channel *)
{
}
// -----------------------------------void ChanPacket::init(TYPE t, const void *p, unsigned int l,unsigned int _pos){
	type = t;	if (l > MAX_DATALEN)
		throw StreamException("Packet data too large");	len = l;	memcpy(data,p,len);
	pos = _pos;
}// -----------------------------------
void ChanPacket::writeRaw(Stream &out)
{
	out.write(data,len);
}
// -----------------------------------void ChanPacket::writePeercast(Stream &out){	unsigned int tp = 0;
	switch (type)
	{
		case T_HEAD: tp = 'HEAD'; break;
		case T_META: tp = 'META'; break;
		case T_DATA: tp = 'DATA'; break;
	}

	if (type != T_UNKNOWN)
	{
		out.writeTag(tp);		out.writeShort(len);		out.writeShort(0);		out.write(data,len);
	}}// -----------------------------------void ChanPacket::readPeercast(Stream &in){
	unsigned int tp = in.readTag();

	switch (tp)
	{
		case 'HEAD':	type = T_HEAD; break;
		case 'DATA':	type = T_DATA; break;
		case 'META':	type = T_META; break;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -