⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp

📁 这是和p2p相关的一份源码
💻 CPP
📖 第 1 页 / 共 5 页
字号:
	{
		out.writeTag(tp);		out.writeShort(len);		out.writeShort(0);		out.write(data,len);
	}}// -----------------------------------void ChanPacket::readPeercast(Stream &in){
	unsigned int tp = in.readTag();

	switch (tp)
	{
		case 'HEAD':	type = T_HEAD; break;
		case 'DATA':	type = T_DATA; break;
		case 'META':	type = T_META; break;
		default:		type = T_UNKNOWN;
	}
	len = in.readShort();	in.readShort();	if (len > MAX_DATALEN)		throw StreamException("Bad ChanPacket");	in.read(data,len);}// -----------------------------------
int ChanPacketBuffer::copyFrom(ChanPacketBuffer &buf, unsigned int reqPos)
{
	lock.on();
	buf.lock.on();

	firstPos = 0;
	lastPos = 0;
	safePos = 0;
	readPos = 0;

	for(unsigned int i=buf.firstPos; i<=buf.lastPos; i++)
	{
		ChanPacket *src = &buf.packets[i%MAX_PACKETS];
		if (src->type & accept)
		{
			if (src->pos >= reqPos)
			{
				lastPos = writePos;
				packets[writePos++] = *src;
			}
		}

	}


	buf.lock.off();
	lock.off();
	return lastPos-firstPos;
}
// -----------------------------------
bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
{
	if (writePos == 0)
		return false;

	unsigned int fpos = getStreamPos(firstPos);
	if (spos < fpos)
		spos = fpos;

	for(unsigned int i=firstPos; i<=lastPos; i++)
	{
		ChanPacket &p = packets[i%MAX_PACKETS];
		if (p.pos >= spos)
		{
			pack = p;
			return true;
		}
	}
	return false;
}
// -----------------------------------
unsigned int	ChanPacketBuffer::getLatestPos()
{
	if (!writePos)
		return 0;
	else
		return getStreamPos(lastPos);
}

// -----------------------------------
unsigned int	ChanPacketBuffer::findOldestPos(unsigned int spos)
{
	unsigned int min = getStreamPos(safePos);
	unsigned int max = getStreamPos(lastPos);

	if (min > spos)
		return min;

	if (max < spos)
		return max;

	return spos;
}

// -----------------------------------
unsigned int	ChanPacketBuffer::getStreamPos(unsigned int index)
{
	return packets[index%MAX_PACKETS].pos;
}
// -----------------------------------
unsigned int	ChanPacketBuffer::getStreamPosEnd(unsigned int index)
{
	return packets[index%MAX_PACKETS].pos+packets[index%MAX_PACKETS].len;
}
// -----------------------------------bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos){
	if (pack.len)
	{
		if (willSkip())	// too far behind
			return false;
		lock.on();
		packets[writePos%MAX_PACKETS] = pack;		lastPos = writePos;		writePos++;
		if (writePos >= MAX_PACKETS)
			firstPos = writePos-MAX_PACKETS;
		else
			firstPos = 0;

		if (writePos >= NUM_SAFEPACKETS)
			safePos = writePos - NUM_SAFEPACKETS;
		else
			safePos = 0;

		if (updateReadPos)
			readPos = writePos;

		lastWriteTime = sys->getTime();
		lock.off();
		return true;
	}

	return false;}// -----------------------------------
void	ChanPacketBuffer::readPacket(ChanPacket &pack)
{

	unsigned int tim = sys->getTime();

	if (readPos < firstPos)	
		throw StreamException("Read too far behind");

	while (readPos >= writePos)
	{
		sys->sleepIdle();
		if ((sys->getTime() - tim) > 30)
			throw TimeoutException();
	}
	lock.on();

	pack = 	packets[readPos%MAX_PACKETS];
	readPos++;

	sys->sleepIdle();

	lock.off();
}
// -----------------------------------
bool	ChanPacketBuffer::willSkip()
{
	return ((writePos-readPos) >= MAX_PACKETS);
}

// -----------------------------------void Channel::getStreamPath(char *str){	char idStr[64];	getIDStr(idStr);	sprintf(str,"/stream/%s%s",idStr,info.getTypeExt(info.contentType));}// -----------------------------------void ChanMgr::startSearch(ChanInfo &info){	searchInfo = info;	clearHitLists();	numFinds = 0;	lastHit = 0;//	lastSearch = 0;	searchActive = true;}// -----------------------------------
void ChanMgr::quit()
{
	closeAll();
}
// -----------------------------------
void ChanMgr::closeAll()
{
	for(int i=0; i<MAX_CHANNELS; i++)
		if (channels[i].isActive())
			channels[i].close();
}// -----------------------------------Channel *ChanMgr::findChannelByNameID(ChanInfo &info){	for(int i=0; i<MAX_CHANNELS; i++)
		if (channels[i].isActive())
			if (channels[i].info.matchNameID(info))
				return &channels[i];
	return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByName(const char *n){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (stricmp(channels[i].info.name,n)==0)				return &channels[i];	return NULL;}// -----------------------------------Channel *ChanMgr::findListenerChannel(){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].numListeners())				return &channels[i];	return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByIndex(int index){
	int cnt=0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())
		{
			if (cnt == index)
				return &channels[i];
			cnt++;
		}	return NULL;}	// -----------------------------------Channel *ChanMgr::findChannelByMount(const char *str){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (strcmp(channels[i].mount,str)==0)				return &channels[i];	return NULL;}	// -----------------------------------Channel *ChanMgr::findChannelByID(GnuID &id){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].info.id.isSame(id))				return &channels[i];	return NULL;}	// -----------------------------------int ChanMgr::findChannels(ChanInfo &info, Channel **ch, int max){	int cnt=0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].info.match(info))			{				ch[cnt++] = &channels[i];				if (cnt >= max)					break;			}	return cnt;}// -----------------------------------int ChanMgr::findChannelsByStatus(Channel **ch, int max, Channel::STATUS status){	int cnt=0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].status == status)			{				ch[cnt++] = &channels[i];				if (cnt >= max)					break;			}	return cnt;}// -----------------------------------Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected){	Channel *c = chanMgr->createChannel(info,NULL);	if (c)	{		c->stayConnected = stayConnected;
		c->startGet();		return c;	}	return NULL;}// -----------------------------------Channel *ChanMgr::findAndRelay(ChanInfo &info){	char idStr[64];	info.id.toStr(idStr);	LOG_CHANNEL("Searching for: %s (%s)",idStr,info.name.cstr());	peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Finding channel...");


	Channel *c = NULL;	c = findChannelByNameID(info);	if (!c)	{		c = chanMgr->createChannel(info,NULL);		if (c)
		{			c->setStatus(Channel::S_SEARCHING);			
			c->startGet();
		}	}
	for(int i=0; i<600; i++)	// search for 1 minute.
	{
		c = findChannelByNameID(info);

		if (!c)
		{			peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
			return NULL;
		}

		
		if (c->isPlaying() && (c->info.contentType!=ChanInfo::T_UNKNOWN))
			break;
		sys->sleep(100);	}	return c;}// -----------------------------------ChanMgr::ChanMgr(){	int i;	for(i=0; i<MAX_CHANNELS; i++)		channels[i].init();	for(i=0; i<MAX_HITLISTS; i++)	{		hitlists[i].index = i;		hitlists[i].init();	}	broadcastMsg.clear();	broadcastMsgInterval=10;	broadcastID.generate();	deadHitAge = 600;
	icyIndex = 0;	icyMetaInterval = 8192;		maxStreamsPerChannel = 0;	searchInfo.init();	minBroadcastTTL = 1;	maxBroadcastTTL = 7;	pushTimeout = 60;	// 1 minute	pushTries = 5;		// 5 times	maxPushHops = 8;	// max 8 hops away	maxUptime = 0;		// 0 = none
	prefetchTime = 10;	// n seconds

	hostUpdateInterval = 180; // 2 minutes

	bufferTime = 5;

	trackerHitList.init();
	autoQuery = 0;		lastQuery = 0;

	lastYPConnect = 0;

}
// -----------------------------------
ChanHit	ChanMgr::getTracker(Host *sh,unsigned int wait,bool useFirewalled)
{
	ChanHit hit;
	hit.init();

	if (!hit.host.ip)
		hit = trackerHitList.getHit(sh,wait,useFirewalled,true);


	if (!hit.host.ip)
		for(int i=0; i<MAX_HITLISTS; i++)
			if (hitlists[i].isUsed())
			{
				hit = hitlists[i].getHit(sh,wait,useFirewalled,true);
				if (hit.host.ip)
					break;
			}

	return hit;
}
// -----------------------------------
ChanHit		ChanMgr::findHit(GnuID &sid,bool tracker)
{
	ChanHit hit;
	hit.init();

	for(int i=0; i<MAX_HITLISTS; i++)
		if (hitlists[i].isUsed())
		{
			hit = hitlists[i].findHit(sid,tracker);
			if (hit.host.ip)
				break;
		}
		
	return hit;
}
// -----------------------------------bool ChanMgr::writeVariable(Stream &out, const String &var, int index){	char buf[1024];	if (var == "numHitLists")		sprintf(buf,"%d",numHitLists());	else if (var == "numRelayed")		sprintf(buf,"%d",numRelayed());	else if (var == "numChannels")		sprintf(buf,"%d",numChannels());	else if (var == "totalListeners")		sprintf(buf,"%d",numListeners());	else if (var == "djMessage")
		strcpy(buf,broadcastMsg.cstr());
	else if (var == "icyMetaInterval")
		sprintf(buf,"%d",icyMetaInterval);
	else if (var == "maxStreamsPerChannel")
		sprintf(buf,"%d",maxStreamsPerChannel);
	else if (var == "hostUpdateInterval")
		sprintf(buf,"%d",hostUpdateInterval);
	else		return false;	out.writeString(buf);	return true;}

// -----------------------------------
bool Channel::writeVariable(Stream &out, const String &var, int index)
{
	char buf[1024];

	buf[0]=0;

	String utf8;

	if (var == "name")
	{
		utf8 = info.name;
		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());

	}else if (var == "bitrate")
	{
		sprintf(buf,"%d",info.bitrate);
	}else if (var == "genre")
	{
		utf8 = info.genre;
		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());
	}else if (var == "desc")
	{
		utf8 = info.desc;
		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());
	}else if (var == "comment")
	{
		utf8 = info.comment;
		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());
	}else if (var == "uptime")
	{
		String uptime;
		if (info.lastPlayTime)
			uptime.setFromStopwatch(sys->getTime()-info.lastPlayTime);
		else
			uptime.set("-");
		strcpy(buf,uptime.cstr());
	}
	else if (var == "type")
		sprintf(buf,"%s",ChanInfo::getTypeStr(info.contentType));
	else if (var == "numRelays")
		sprintf(buf,"%d",numRelays());
	else if (var == "numListeners")
		sprintf(buf,"%d",numListeners());
	else if (var == "status")
		sprintf(buf,"%s",getStatusStr());
	else if (var == "keep")
		sprintf(buf,"%s",stayConnected?"Yes":"No");
	else if (var == "id")
		info.id.toStr(buf);
	else if (var.startsWith("track."))
	{

		if (var == "track.title")
			utf8 = info.track.title;
		else if (var == "track.artist")
			utf8 = info.track.artist;
		else if (var == "track.album")
			utf8 = info.track.album;
		else if (var == "track.genre")
			utf8 = info.track.genre;
		else if (var == "track.contactURL")
			utf8 = info.track.contact;

		utf8.convertTo(String::T_UNICODESAFE);
		strcpy(buf,utf8.cstr());

	}else if (var == "contactURL")
		sprintf(buf,"%s",info.url.cstr());
	else if (var == "streamPos")
		sprintf(buf,"%d",streamPos);
	else if (var == "sourceType")
		strcpy(buf,getSrcTypeStr());
	else if (var == "sourceProtocol")
		strcpy(buf,ChanInfo::getProtocolStr(info.srcProtocol));
	else if (var == "sourceURL")
		strcpy(buf,sourceURL.cstr());
	else if (var == "headPos")
		sprintf(buf,"%d",headPack.pos);
	else if (var == "headLen")
		sprintf(buf,"%d",headPack.len);
	else if (var == "numHits")
	{
		ChanHitList *chl = chanMgr->findHitListByID(info.id);
		int numHits = 0;
		if (chl)
			numHits = chl->numHits();
		sprintf(buf,"%d",numHits);
	}else
		return false;

	out.writeString(buf);
	return true;
}

// -----------------------------------
void ChanMgr::broadcastTrackerUpdate(GnuID &svID, bool force)
{
	for(int i=0; i<MAX_CHANNELS; i++)
	{
		Channel *c = &channels[i];
		if ( c->isActive() && c->isBroadcasting() )
			c->broadcastTrackerUpdate(svID,force);
	}
}

// -----------------------------------
int ChanMgr::broadcastPacketUp(ChanPacket &pack,GnuID &chanID, GnuID &srcID, GnuID &destID)
{
	int cnt=0;

	if (destID.isSet())
	{
		for(int i=0; i<MAX_CHANNELS; i++)
			if (channels[i].sendPacketUp(pack,chanID,srcID,destID))
				return 1;
	}

	GnuID noID;
	noID.clear();

	for(int i=0; i<MAX_CHANNELS; i++)
		if (channels[i].sendPacketUp(pack,chanID,srcID,noID))
			cnt++;

	return cnt;
}
// -----------------------------------void ChanMgr::broadcastRelays(Servent *serv, int minTTL, int maxTTL){	//if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP())	{		Host sh = servMgr->serverHost;		bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF);		bool busy = (servMgr->pubInFull() && servMgr->outFull()) || servMgr->streamFull();		bool stable = servMgr->totalStreams>0;
		GnuPacket hit;		int numChans=0;		for(int i=0; i<MAX_CHANNELS; i++)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -