⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp

📁 这是和p2p相关的一份源码
💻 CPP
📖 第 1 页 / 共 5 页
字号:
}

// ------------------------------------------
int RawStream::readPacket(Stream &in,Channel *ch)
{
	readRaw(in,ch);
	return 0;
}

// ------------------------------------------
void RawStream::readEnd(Stream &,Channel *)
{
}
// -----------------------------------void ChanPacket::init(TYPE t, const void *p, unsigned int l,unsigned int _pos){
	type = t;	if (l > MAX_DATALEN)
		throw StreamException("Packet data too large");	len = l;	memcpy(data,p,len);
	pos = _pos;
}// -----------------------------------
void ChanPacket::writeRaw(Stream &out)
{
	out.write(data,len);
}
// -----------------------------------void ChanPacket::writePeercast(Stream &out){	unsigned int tp = 0;
	switch (type)
	{
		case T_HEAD: tp = 'HEAD'; break;
		case T_META: tp = 'META'; break;
		case T_DATA: tp = 'DATA'; break;
	}

	if (type != T_UNKNOWN)
	{
		out.writeTag(tp);		out.writeShort(len);		out.writeShort(0);		out.write(data,len);
	}}// -----------------------------------void ChanPacket::readPeercast(Stream &in){
	unsigned int tp = in.readTag();

	switch (tp)
	{
		case 'HEAD':	type = T_HEAD; break;
		case 'DATA':	type = T_DATA; break;
		case 'META':	type = T_META; break;
		default:		type = T_UNKNOWN;
	}
	len = in.readShort();	in.readShort();	if (len > MAX_DATALEN)		throw StreamException("Bad ChanPacket");	in.read(data,len);}// -----------------------------------
int ChanPacketBuffer::copyFrom(ChanPacketBuffer &buf, unsigned int reqPos)
{
	lock.on();
	buf.lock.on();

	firstPos = 0;
	lastPos = 0;
	safePos = 0;
	readPos = 0;

	for(unsigned int i=buf.firstPos; i<=buf.lastPos; i++)
	{
		ChanPacket *src = &buf.packets[i%MAX_PACKETS];
		if (src->type & accept)
		{
			if (src->pos >= reqPos)
			{
				lastPos = writePos;
				packets[writePos++] = *src;
			}
		}

	}


	buf.lock.off();
	lock.off();
	return lastPos-firstPos;
}

// -----------------------------------
bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
{
	if (writePos == 0)
		return false;

	lock.on();

	unsigned int fpos = getStreamPos(firstPos);
	if (spos < fpos)
		spos = fpos;


	for(unsigned int i=firstPos; i<=lastPos; i++)
	{
		ChanPacket &p = packets[i%MAX_PACKETS];
		if (p.pos >= spos)
		{
			pack = p;
			lock.off();
			return true;
		}
	}

	lock.off();
	return false;
}
// -----------------------------------
unsigned int	ChanPacketBuffer::getLatestPos()
{
	if (!writePos)
		return 0;
	else
		return getStreamPos(lastPos);
}
// -----------------------------------
unsigned int	ChanPacketBuffer::getOldestPos()
{
	if (!writePos)
		return 0;
	else
		return getStreamPos(firstPos);
}

// -----------------------------------
unsigned int	ChanPacketBuffer::findOldestPos(unsigned int spos)
{
	unsigned int min = getStreamPos(safePos);
	unsigned int max = getStreamPos(lastPos);

	if (min > spos)
		return min;

	if (max < spos)
		return max;

	return spos;
}

// -----------------------------------
unsigned int	ChanPacketBuffer::getStreamPos(unsigned int index)
{
	return packets[index%MAX_PACKETS].pos;
}
// -----------------------------------
unsigned int	ChanPacketBuffer::getStreamPosEnd(unsigned int index)
{
	return packets[index%MAX_PACKETS].pos+packets[index%MAX_PACKETS].len;
}
// -----------------------------------bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos){
	if (pack.len)
	{
		if (willSkip())	// too far behind
			return false;
		lock.on();

		pack.sync = writePos;		packets[writePos%MAX_PACKETS] = pack;		lastPos = writePos;		writePos++;
		if (writePos >= MAX_PACKETS)
			firstPos = writePos-MAX_PACKETS;
		else
			firstPos = 0;

		if (writePos >= NUM_SAFEPACKETS)
			safePos = writePos - NUM_SAFEPACKETS;
		else
			safePos = 0;

		if (updateReadPos)
			readPos = writePos;

		lastWriteTime = sys->getTime();
		lock.off();
		return true;
	}

	return false;}// -----------------------------------
void	ChanPacketBuffer::readPacket(ChanPacket &pack)
{

	unsigned int tim = sys->getTime();

	if (readPos < firstPos)	
		throw StreamException("Read too far behind");

	while (readPos >= writePos)
	{
		sys->sleepIdle();
		if ((sys->getTime() - tim) > 30)
			throw TimeoutException();
	}
	lock.on();
	pack = 	packets[readPos%MAX_PACKETS];
	readPos++;
	lock.off();

	sys->sleepIdle();

}
// -----------------------------------
bool	ChanPacketBuffer::willSkip()
{
	return ((writePos-readPos) >= MAX_PACKETS);
}

// -----------------------------------void Channel::getStreamPath(char *str){	char idStr[64];	getIDStr(idStr);	sprintf(str,"/stream/%s%s",idStr,info.getTypeExt(info.contentType));}
// -----------------------------------void ChanMgr::startSearch(ChanInfo &info){	searchInfo = info;	clearHitLists();	numFinds = 0;	lastHit = 0;//	lastSearch = 0;	searchActive = true;}// -----------------------------------
void ChanMgr::quit()
{
	LOG_DEBUG("ChanMgr is quitting..");
	closeAll();
}
// -----------------------------------
int ChanMgr::numIdleChannels()
{
	int cnt=0;
	Channel *ch = channel;
	while (ch)
	{
		if (ch->isActive())
			if (ch->thread.active)
				if (ch->status == Channel::S_IDLE)
					cnt++;
		ch=ch->next;
	}
	return cnt;
}
// -----------------------------------
void ChanMgr::closeOldestIdle()
{
	unsigned int idleTime = (unsigned int)-1;
	Channel *ch = channel,*oldest=NULL;
	while (ch)
	{
		if (ch->isActive())
			if (ch->thread.active)
				if (ch->status == Channel::S_IDLE)
					if (ch->lastIdleTime < idleTime)
					{
						oldest = ch;
						idleTime = ch->lastIdleTime;
					}
		ch=ch->next;
	}

	if (oldest)
		oldest->thread.active = false;
}

// -----------------------------------
void ChanMgr::closeAll()
{
	Channel *ch = channel;
	while (ch)
	{
		if (ch->thread.active)
			ch->thread.shutdown();
		ch=ch->next;
	}
}// -----------------------------------Channel *ChanMgr::findChannelByNameID(ChanInfo &info){	Channel *ch = channel;
	while (ch)
	{
		if (ch->isActive())
			if (ch->info.matchNameID(info))
				return ch;
		ch=ch->next;
	}
	return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByName(const char *n){	Channel *ch = channel;
	while (ch)
	{
		if (ch->isActive())			if (stricmp(ch->info.name,n)==0)				return ch;		ch=ch->next;
	}
	return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByIndex(int index){
	int cnt=0;	Channel *ch = channel;
	while (ch)
	{
		if (ch->isActive())
		{
			if (cnt == index)
				return ch;
			cnt++;
		}		ch=ch->next;
	}
	return NULL;}	// -----------------------------------Channel *ChanMgr::findChannelByMount(const char *str){	Channel *ch = channel;
	while (ch)
	{
		if (ch->isActive())			if (strcmp(ch->mount,str)==0)				return ch;		ch=ch->next;
	}
	return NULL;}	// -----------------------------------Channel *ChanMgr::findChannelByID(GnuID &id){	Channel *ch = channel;
	while (ch)
	{
		if (ch->isActive())			if (ch->info.id.isSame(id))				return ch;		ch=ch->next;
	}	return NULL;}	// -----------------------------------int ChanMgr::findChannels(ChanInfo &info, Channel **chlist, int max){	int cnt=0;	Channel *ch = channel;
	while (ch)
	{
		if (ch->isActive())			if (ch->info.match(info))			{				chlist[cnt++] = ch;				if (cnt >= max)					break;			}		ch=ch->next;
	}
	return cnt;}// -----------------------------------int ChanMgr::findChannelsByStatus(Channel **chlist, int max, Channel::STATUS status){	int cnt=0;	Channel *ch = channel;
	while (ch)
	{
		if (ch->isActive())			if (ch->status == status)			{				chlist[cnt++] = ch;				if (cnt >= max)					break;			}		ch=ch->next;
	}
	return cnt;}// -----------------------------------Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected){	Channel *c = chanMgr->createChannel(info,NULL);	if (c)	{		c->stayConnected = stayConnected;
		c->startGet();		return c;	}	return NULL;}// -----------------------------------Channel *ChanMgr::findAndRelay(ChanInfo &info){	char idStr[64];	info.id.toStr(idStr);	LOG_CHANNEL("Searching for: %s (%s)",idStr,info.name.cstr());	peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Finding channel...");


	Channel *c = NULL;	c = findChannelByNameID(info);	if (!c)	{		c = chanMgr->createChannel(info,NULL);		if (c)
		{			c->setStatus(Channel::S_SEARCHING);			
			c->startGet();
		}	}
	for(int i=0; i<600; i++)	// search for 1 minute.
	{
		c = findChannelByNameID(info);

		if (!c)
		{			peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
			return NULL;
		}

		
		if (c->isPlaying() && (c->info.contentType!=ChanInfo::T_UNKNOWN))
			break;
		sys->sleep(100);	}	return c;}// -----------------------------------ChanMgr::ChanMgr(){	channel = NULL;	
	hitlist = NULL;
	currFindAndPlayChannel.clear();
	broadcastMsg.clear();	broadcastMsgInterval=10;
	broadcastID.generate(PCP_BROADCAST_FLAGS);
	deadHitAge = 600;
	icyIndex = 0;	icyMetaInterval = 8192;		maxRelaysPerChannel = 0;	searchInfo.init();	minBroadcastTTL = 1;	maxBroadcastTTL = 7;	pushTimeout = 60;	// 1 minute	pushTries = 5;		// 5 times	maxPushHops = 8;	// max 8 hops away	maxUptime = 0;		// 0 = none
	prefetchTime = 10;	// n seconds

	hostUpdateInterval = 180; // 2 minutes

	bufferTime = 5;

	autoQuery = 0;		lastQuery = 0;

	lastYPConnect = 0;

}
// -----------------------------------bool ChanMgr::writeVariable(Stream &out, const String &var, int index){	char buf[1024];	if (var == "numHitLists")		sprintf(buf,"%d",numHitLists());
	
	else if (var == "numChannels")		sprintf(buf,"%d",numChannels());	else if (var == "djMessage")
		strcpy(buf,broadcastMsg.cstr());
	else if (var == "icyMetaInterval")
		sprintf(buf,"%d",icyMetaInterval);
	else if (var == "maxRelaysPerChannel")
		sprintf(buf,"%d",maxRelaysPerChannel);
	else if (var == "hostUpdateInterval")
		sprintf(buf,"%d",hostUpdateInterval);
	else if (var == "broadcastID")
		broadcastID.toStr(buf);
	else		return false;	out.writeString(buf);	return true;}

// -----------------------------------
bool Channel::writeVariable(Stream &out, const String &var, int index)
{
	char buf[1024];

	buf[0]=0;

	String utf8;

	if (var == "name")
	{
		utf8 = info.name;
		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());

	}else if (var == "bitrate")
	{
		sprintf(buf,"%d",info.bitrate);

	}else if (var == "srcrate")
	{
		if (sourceData)
		{
			unsigned int tot = sourceData->getSourceRate();
			sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
		}else
			strcpy(buf,"0");

	}else if (var == "genre")
	{
		utf8 = info.genre;
		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());
	}else if (var == "desc")
	{
		utf8 = info.desc;
		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());
	}else if (var == "comment")
	{
		utf8 = info.comment;
		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());
	}else if (var == "uptime")
	{
		String uptime;
		if (info.lastPlayStart)
			uptime.setFromStopwatch(sys->getTime()-info.lastPlayStart);
		else
			uptime.set("-");
		strcpy(buf,uptime.cstr());
	}
	else if (var == "type")
		sprintf(buf,"%s",ChanInfo::getTypeStr(info.contentType));
	else if (var == "ext")
		sprintf(buf,"%s",ChanInfo::getTypeExt(info.contentType));

	else if (var == "localRelays")
		sprintf(buf,"%d",localRelays());
	else if (var == "localListeners")
		sprintf(buf,"%d",localListeners());

	else if (var == "totalRelays")
		sprintf(buf,"%d",totalRelays());
	else if (var == "totalListeners")
		sprintf(buf,"%d",totalListeners());

	else if (var == "status")
		sprintf(buf,"%s",getStatusStr());
	else if (var == "keep")
		sprintf(buf,"%s",stayConnected?"Yes":"No");
	else if (var == "id")
		info.id.toStr(buf);
	else if (var.startsWith("track."))
	{

		if (var == "track.title")
			utf8 = info.track.title;
		else if (var == "track.artist")
			utf8 = info.track.artist;
		else if (var == "track.album")
			utf8 = info.track.album;
		else if (var == "track.genre")
			utf8 = info.track.genre;
		else if (var == "track.contactURL")
			utf8 = info.track.contact;

		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());

	}else if (var == "contactURL")
		sprintf(buf,"%s",info.url.cstr());
	else if (var == "streamPos")
		sprintf(buf,"%d",streamPos);
	else if (var == "sourceType")
		strcpy(buf,getSrcTypeStr());
	else if (var == "sourceProtocol")
		strcpy(buf,ChanInfo::getProtocolStr(info.srcProtocol));
	else if (var == "sourceURL")
	{
		if (sourceURL.isEmpty())
			sourceHost.host.toStr(buf);
		else
			strcpy(buf,sourceURL.cstr());
	}
	else if (var == "headPos")
		sprintf(buf,"%d",headPack.pos);
	else if (var == "headLen")
		sprintf(buf,"%d",headPack.len);
	else if (var == "numHits")
	{
		ChanHitList *chl = chanMgr->findHitListByID(info.id);
		int numHits = 0;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -