⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp.svn-base

📁 这是和p2p相关的一份源码
💻 SVN-BASE
📖 第 1 页 / 共 5 页
字号:
				break;			case ChanPacket::T_DATA:
				pack.pos = ch->streamPos;
				ch->newPacket(pack);
				ch->streamPos+=pack.len;
				break;
			case ChanPacket::T_META:				ch->insertMeta.fromMem(pack.data,pack.len);				{					if (pack.len)					{						XML xml;						xml.read(mem);						XML::Node *n = xml.findNode("channel");											if (n)						{
							ChanInfo newInfo = ch->info;							newInfo.updateFromXML(n);							ChanHitList *chl = chanMgr->findHitList(ch->info);							if (chl)								newInfo.updateFromXML(n);							ch->updateInfo(newInfo);
						}					}				}
				break;#if 0
			case ChanPacket::T_SYNC:				{					unsigned int s = mem.readLong();					if ((s-ch->syncPos) != 1)					{						LOG_CHANNEL("Ch.%d SKIP: %d to %d (%d)",ch->index,ch->syncPos,s,ch->info.numSkips);						if (ch->syncPos)						{							ch->info.numSkips++;							if (ch->info.numSkips>50)								throw StreamException("Bumped - Too many skips");						}					}					ch->syncPos = s;				}				break;
#endif
		}
	}
	return 0;}// -----------------------------------void ChannelStream::readRaw(Stream &in, Channel *ch){	ChanPacket pack;	const int readLen = 8192;
	pack.init(ChanPacket::T_DATA,pack.data,readLen,ch->streamPos);	in.read(pack.data,pack.len);	ch->newPacket(pack);
	ch->checkReadDelay(pack.len);

	ch->streamPos+=pack.len;}
// ------------------------------------------
void RawStream::readHeader(Stream &,Channel *)
{
}

// ------------------------------------------
int RawStream::readPacket(Stream &in,Channel *ch)
{
	readRaw(in,ch);
	return 0;
}

// ------------------------------------------
void RawStream::readEnd(Stream &,Channel *)
{
}
// -----------------------------------void ChanPacket::init(TYPE t, const void *p, unsigned int l,unsigned int _pos){
	type = t;	if (l > MAX_DATALEN)
		throw StreamException("Packet data too large");	len = l;	memcpy(data,p,len);
	pos = _pos;
}// -----------------------------------
void ChanPacket::writeRaw(Stream &out)
{
	out.write(data,len);
}
// -----------------------------------void ChanPacket::writePeercast(Stream &out){	unsigned int tp = 0;
	switch (type)
	{
		case T_HEAD: tp = 'HEAD'; break;
		case T_META: tp = 'META'; break;
		case T_DATA: tp = 'DATA'; break;
	}

	if (type != T_UNKNOWN)
	{
		out.writeTag(tp);		out.writeShort(len);		out.writeShort(0);		out.write(data,len);
	}}// -----------------------------------void ChanPacket::readPeercast(Stream &in){
	unsigned int tp = in.readTag();

	switch (tp)
	{
		case 'HEAD':	type = T_HEAD; break;
		case 'DATA':	type = T_DATA; break;
		case 'META':	type = T_META; break;
		default:		type = T_UNKNOWN;
	}
	len = in.readShort();	in.readShort();	if (len > MAX_DATALEN)		throw StreamException("Bad ChanPacket");	in.read(data,len);}// -----------------------------------
int ChanPacketBuffer::copyFrom(ChanPacketBuffer &buf, unsigned int reqPos)
{
	lock.on();
	buf.lock.on();

	firstPos = 0;
	lastPos = 0;
	safePos = 0;
	readPos = 0;

	for(unsigned int i=buf.firstPos; i<=buf.lastPos; i++)
	{
		ChanPacket *src = &buf.packets[i%MAX_PACKETS];
		if (src->type & accept)
		{
			if (src->pos >= reqPos)
			{
				lastPos = writePos;
				packets[writePos++] = *src;
			}
		}

	}


	buf.lock.off();
	lock.off();
	return lastPos-firstPos;
}

// -----------------------------------
bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
{
	if (writePos == 0)
		return false;

	lock.on();

	unsigned int fpos = getStreamPos(firstPos);
	if (spos < fpos)
		spos = fpos;


	for(unsigned int i=firstPos; i<=lastPos; i++)
	{
		ChanPacket &p = packets[i%MAX_PACKETS];
		if (p.pos >= spos)
		{
			pack = p;
			lock.off();
			return true;
		}
	}

	lock.off();
	return false;
}
// -----------------------------------
unsigned int	ChanPacketBuffer::getLatestPos()
{
	if (!writePos)
		return 0;
	else
		return getStreamPos(lastPos);
}
// -----------------------------------
unsigned int	ChanPacketBuffer::getOldestPos()
{
	if (!writePos)
		return 0;
	else
		return getStreamPos(firstPos);
}

// -----------------------------------
unsigned int	ChanPacketBuffer::findOldestPos(unsigned int spos)
{
	unsigned int min = getStreamPos(safePos);
	unsigned int max = getStreamPos(lastPos);

	if (min > spos)
		return min;

	if (max < spos)
		return max;

	return spos;
}

// -----------------------------------
unsigned int	ChanPacketBuffer::getStreamPos(unsigned int index)
{
	return packets[index%MAX_PACKETS].pos;
}
// -----------------------------------
unsigned int	ChanPacketBuffer::getStreamPosEnd(unsigned int index)
{
	return packets[index%MAX_PACKETS].pos+packets[index%MAX_PACKETS].len;
}
// -----------------------------------bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos){
	if (pack.len)
	{
		if (willSkip())	// too far behind
			return false;
		lock.on();
		packets[writePos%MAX_PACKETS] = pack;		lastPos = writePos;		writePos++;
		if (writePos >= MAX_PACKETS)
			firstPos = writePos-MAX_PACKETS;
		else
			firstPos = 0;

		if (writePos >= NUM_SAFEPACKETS)
			safePos = writePos - NUM_SAFEPACKETS;
		else
			safePos = 0;

		if (updateReadPos)
			readPos = writePos;

		lastWriteTime = sys->getTime();
		lock.off();
		return true;
	}

	return false;}// -----------------------------------
void	ChanPacketBuffer::readPacket(ChanPacket &pack)
{

	unsigned int tim = sys->getTime();

	if (readPos < firstPos)	
		throw StreamException("Read too far behind");

	while (readPos >= writePos)
	{
		sys->sleepIdle();
		if ((sys->getTime() - tim) > 30)
			throw TimeoutException();
	}
	lock.on();

	pack = 	packets[readPos%MAX_PACKETS];
	readPos++;

	sys->sleepIdle();

	lock.off();
}
// -----------------------------------
bool	ChanPacketBuffer::willSkip()
{
	return ((writePos-readPos) >= MAX_PACKETS);
}

// -----------------------------------void Channel::getStreamPath(char *str){	char idStr[64];	getIDStr(idStr);	sprintf(str,"/stream/%s%s",idStr,info.getTypeExt(info.contentType));}// -----------------------------------void ChanMgr::startSearch(ChanInfo &info){	searchInfo = info;	clearHitLists();	numFinds = 0;	lastHit = 0;//	lastSearch = 0;	searchActive = true;}// -----------------------------------
void ChanMgr::quit()
{
	closeAll();
}
// -----------------------------------
void ChanMgr::closeAll()
{
	for(int i=0; i<MAX_CHANNELS; i++)
		if (channels[i].isActive())
			channels[i].close();
}// -----------------------------------Channel *ChanMgr::findChannelByNameID(ChanInfo &info){	for(int i=0; i<MAX_CHANNELS; i++)
		if (channels[i].isActive())
			if (channels[i].info.matchNameID(info))
				return &channels[i];
	return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByName(const char *n){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (stricmp(channels[i].info.name,n)==0)				return &channels[i];	return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByIndex(int index){
	int cnt=0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())
		{
			if (cnt == index)
				return &channels[i];
			cnt++;
		}	return NULL;}	// -----------------------------------Channel *ChanMgr::findChannelByMount(const char *str){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (strcmp(channels[i].mount,str)==0)				return &channels[i];	return NULL;}	// -----------------------------------Channel *ChanMgr::findChannelByID(GnuID &id){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].info.id.isSame(id))				return &channels[i];	return NULL;}	// -----------------------------------int ChanMgr::findChannels(ChanInfo &info, Channel **ch, int max){	int cnt=0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].info.match(info))			{				ch[cnt++] = &channels[i];				if (cnt >= max)					break;			}	return cnt;}// -----------------------------------int ChanMgr::findChannelsByStatus(Channel **ch, int max, Channel::STATUS status){	int cnt=0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].status == status)			{				ch[cnt++] = &channels[i];				if (cnt >= max)					break;			}	return cnt;}// -----------------------------------Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected){	Channel *c = chanMgr->createChannel(info,NULL);	if (c)	{		c->stayConnected = stayConnected;
		c->startGet();		return c;	}	return NULL;}// -----------------------------------Channel *ChanMgr::findAndRelay(ChanInfo &info){	char idStr[64];	info.id.toStr(idStr);	LOG_CHANNEL("Searching for: %s (%s)",idStr,info.name.cstr());	peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Finding channel...");


	Channel *c = NULL;	c = findChannelByNameID(info);	if (!c)	{		c = chanMgr->createChannel(info,NULL);		if (c)
		{			c->setStatus(Channel::S_SEARCHING);			
			c->startGet();
		}	}
	for(int i=0; i<600; i++)	// search for 1 minute.
	{
		c = findChannelByNameID(info);

		if (!c)
		{			peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
			return NULL;
		}

		
		if (c->isPlaying() && (c->info.contentType!=ChanInfo::T_UNKNOWN))
			break;
		sys->sleep(100);	}	return c;}// -----------------------------------ChanMgr::ChanMgr(){	int i;	for(i=0; i<MAX_CHANNELS; i++)		channels[i].init();	for(i=0; i<MAX_HITLISTS; i++)	{		hitlists[i].index = i;		hitlists[i].init();	}
	currFindAndPlayChannel.clear();
	broadcastMsg.clear();	broadcastMsgInterval=10;	broadcastID.generate();	deadHitAge = 600;
	icyIndex = 0;	icyMetaInterval = 8192;		maxRelaysPerChannel = 0;	searchInfo.init();	minBroadcastTTL = 1;	maxBroadcastTTL = 7;	pushTimeout = 60;	// 1 minute	pushTries = 5;		// 5 times	maxPushHops = 8;	// max 8 hops away	maxUptime = 0;		// 0 = none
	prefetchTime = 10;	// n seconds

	hostUpdateInterval = 180; // 2 minutes

	bufferTime = 5;

	autoQuery = 0;		lastQuery = 0;

	lastYPConnect = 0;

}

// -----------------------------------bool ChanMgr::writeVariable(Stream &out, const String &var, int index){	char buf[1024];	if (var == "numHitLists")		sprintf(buf,"%d",numHitLists());
	
	else if (var == "numChannels")		sprintf(buf,"%d",numChannels());	else if (var == "djMessage")
		strcpy(buf,broadcastMsg.cstr());
	else if (var == "icyMetaInterval")
		sprintf(buf,"%d",icyMetaInterval);
	else if (var == "maxRelaysPerChannel")
		sprintf(buf,"%d",maxRelaysPerChannel);
	else if (var == "hostUpdateInterval")
		sprintf(buf,"%d",hostUpdateInterval);
	else		return false;	out.writeString(buf);	return true;}

// -----------------------------------
bool Channel::writeVariable(Stream &out, const String &var, int index)
{
	char buf[1024];

	buf[0]=0;

	String utf8;

	if (var == "name")
	{
		utf8 = info.name;
		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());

	}else if (var == "bitrate")
	{
		sprintf(buf,"%d",info.bitrate);
	}else if (var == "genre")
	{
		utf8 = info.genre;
		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());
	}else if (var == "desc")
	{
		utf8 = info.desc;
		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());
	}else if (var == "comment")
	{
		utf8 = info.comment;
		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());
	}else if (var == "uptime")
	{
		String uptime;
		if (info.lastPlayStart)
			uptime.setFromStopwatch(sys->getTime()-info.lastPlayStart);
		else
			uptime.set("-");
		strcpy(buf,uptime.cstr());
	}
	else if (var == "type")
		sprintf(buf,"%s",ChanInfo::getTypeStr(info.contentType));

	else if (var == "localRelays")
		sprintf(buf,"%d",localRelays());
	else if (var == "localListeners")
		sprintf(buf,"%d",localListeners());

	else if (var == "totalRelays")
		sprintf(buf,"%d",totalRelays());
	else if (var == "totalListeners")
		sprintf(buf,"%d",totalListeners());

	else if (var == "status")
		sprintf(buf,"%s",getStatusStr());
	else if (var == "keep")
		sprintf(buf,"%s",stayConnected?"Yes":"No");
	else if (var == "id")
		info.id.toStr(buf);
	else if (var.startsWith("track."))
	{

		if (var == "track.title")
			utf8 = info.track.title;
		else if (var == "track.artist")
			utf8 = info.track.artist;
		else if (var == "track.album")
			utf8 = info.track.album;
		else if (var == "track.genre")
			utf8 = info.track.genre;
		else if (var == "track.contactURL")
			utf8 = info.track.contact;

		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -