⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp

📁 这是和p2p相关的一份源码
💻 CPP
📖 第 1 页 / 共 4 页
字号:
		int spos = gpos % MAX_DATALEN; // start		int epos = pos % MAX_DATALEN;	// end		unsigned int rlen;		if (pos >= MAX_DATALEN)		{			if (spos < epos)					rlen = epos-spos;			else				rlen = MAX_DATALEN-spos;		}else{			rlen = epos-spos;		}		// read as many bytes as we can		if (rlen > len)			rlen = len;		if (rlen)		{			//LOG("BUF READ gpos=%d, pos=%d, spos=%d, epos=%d, avail=%d",gpos,pos,spos,epos,avail);			if (rlen > MAX_DATALEN)				{				// something went horribly wrong				throw StreamException("Chan buffer read fail");			}			memcpy(p,&data[spos],rlen);			p = (char *)p+rlen;			len -= rlen;			gpos += rlen;		}	}	return gpos;}// -----------------------------------void	ChanBuffer::write(const void *p,int len){	while (len)	{		unsigned int bpos = pos % MAX_DATALEN;		int wlen = len;		if ((bpos+wlen) > MAX_DATALEN)			wlen = MAX_DATALEN-bpos;		memcpy(&data[bpos],p,wlen);		p = (char *)p+wlen;		len -= wlen;		pos += wlen;	}}// -----------------------------------void	ChanBuffer::writePacket(ChanPacket &pack){	lock.on();	unsigned int t = sys->gtohl( SWAP4(pack.type) );		//LOG("write packet %x,%d",t,pack.len);	lastPacket = pos;		if ((lastPacket-firstPacket) > MAX_DATALEN)		firstPacket = lastPacket;	write(&t,4);	write(&pack.len,sizeof(int));	write(pack.data,pack.len);	lock.off();}#endif// -----------------------------------void	ChanPacketBuffer::writePacket(ChanPacket &pack){	lock.on();	packets[currPacket%MAX_PACKETS] = pack;	lastPacket = currPacket;	currPacket++;	if (currPacket <= MAX_PACKETS)		firstPacket = 0;	else		firstPacket = currPacket-MAX_PACKETS;	lock.off();}// -----------------------------------unsigned int	ChanPacketBuffer::readPacket(unsigned int pos,ChanPacket &pack){	unsigned int tim = sys->getTime();	if (pos > currPacket)		pos = currPacket;	while (pos == currPacket)	{		sys->sleepIdle();		if ((sys->getTime() - tim) > 30)			throw TimeoutException();	}	lock.on();	if (pos == 0)			// start of stream		pos = firstPacket;	// or lastPacket	else if (pos < firstPacket)	// too far behind		pos = firstPacket;		// so skip forward	pack = 	packets[pos%MAX_PACKETS];	pos++;	lock.off();	return pos;}#if 0// -----------------------------------unsigned int	ChanBuffer::readPacket(unsigned int gpos, ChanPacket &pack){	while (gpos >= pos)		sys->sleepIdle();	lock.on();	try	{		gpos = read(gpos,&pack.type,sizeof(pack.type));		pack.type = sys->gtohl( SWAP4(pack.type) );		gpos = read(gpos,&pack.len,sizeof(pack.len));		if (pack.len > ChanPacket::MAX_DATALEN)			pack.len = ChanPacket::MAX_DATALEN;		gpos = read(gpos,pack.data,pack.len);	}catch (StreamException &)	{	}	lock.off();	return gpos;}#endif// -----------------------------------void Channel::processOggIdent(OggPacket &ogg){	MemoryStream in(ogg.getContent(),ogg.getContentLen());	in.skip(7);	int ver = in.readLong();	int chans = in.readChar();	int rate = in.readLong();	int brMax = in.readLong();	int brNom = in.readLong();	int brLow = in.readLong();	LOG_CHANNEL("OGG Ident: ver=%d, chans=%d, rate=%d, brMax=%d, brNom=%d, brLow=%d",		ver,chans,rate,brMax,brNom,brLow);	if (brNom > 0)		info.bitrate = brNom/1000;	else		info.bitrate = 0;}// -----------------------------------void Channel::processOggComment(OggPacket &ogg){	MemoryStream in(ogg.getContent(),ogg.getContentLen());	in.skip(7);		// skip type + 'vorbis'	int vLen = in.readLong();	// vendor len	in.skip(vLen);	char argBuf[256];	info.track.clear();	int cLen = in.readLong();	// comment len	for(int i=0; i<cLen; i++)	{		int l = in.readLong();		if (l > sizeof(argBuf))			throw StreamException("Comment string too long");		in.read(argBuf,l);		argBuf[l] = 0;		LOG_CHANNEL("OGG Comment: %s",argBuf);		char *arg;		if ((arg=stristr(argBuf,"ARTIST=")))			info.track.artist.set(arg+7,String::T_ASCII);		else if ((arg=stristr(argBuf,"TITLE=")))			info.track.title.set(arg+6,String::T_ASCII);		else if ((arg=stristr(argBuf,"GENRE=")))			info.track.genre.set(arg+6,String::T_ASCII);		else if ((arg=stristr(argBuf,"CONTACT=")))			info.track.contact.set(arg+8,String::T_ASCII);		else if ((arg=stristr(argBuf,"ALBUM=")))			info.track.album.set(arg+6,String::T_ASCII);	}	updateMeta();}// -----------------------------------void Channel::getStreamPath(char *str){	char idStr[64];	getIDStr(idStr);	sprintf(str,"/stream/%s.%s",idStr,info.getTypeExt(info.contentType));}// -----------------------------------void Channel::readOGG(){	OggPacket ogg;	ChanPacket pack;	while ((!input->eof() && (thread.active)))	{		ogg.read(*input);		if (ogg.isBOS())		{			LOG_CHANNEL("Got OGG BOS");			headMeta.len = 0;			headMeta.cnt = 0;		}		if (ogg.isVorbisPacket())		{			if (ogg.getVorbisType()==3)			{				LOG_CHANNEL("Vorbis: Comment");				processOggComment(ogg);			}		}		// first 3 headers of OGG are needed		if (headMeta.cnt < 3)				{			//LOG("New OGG Header %d - %02x - %x",headMeta.cnt,ogg.data[5],*(unsigned int *)&ogg.data[14]);			if (headMeta.len+ogg.len > ChanMeta::MAX_DATALEN)				throw StreamException("OGG packet too big for headMeta");			if (ogg.isVorbisPacket())			{				switch (ogg.getVorbisType())				{					case 1:	// ident						LOG_CHANNEL("Vorbis Header: Ident");						processOggIdent(ogg);						break;					case 3: // comment						LOG_CHANNEL("Vorbis Header: Comment");						//processOggComment(ogg);						break;					case 5: // setup						LOG_CHANNEL("Vorbis Header: Setup");						break;				}				memcpy(&headMeta.data[headMeta.len],ogg.data,ogg.len);				headMeta.len += ogg.len;				headMeta.cnt++;			}else{				// we`ve only had 2 vorbis headers, but force a continue anyway.				headMeta.cnt = 3;				LOG_CHANNEL("Vorbis: No Setup header, continue anyway...");			}			if (headMeta.cnt == 3)			{				pack.init('HEAD',headMeta.data,headMeta.len);				chanData.writePacket(pack);			}		}		syncPos++;		pack.init('SYNC',&syncPos,sizeof(syncPos));		chanData.writePacket(pack);		pack.init('DATA',ogg.data,ogg.len);		chanData.writePacket(pack);		if (checkBump())			throw StreamException("Bumped");		if (checkIdle())			break;		if (readDelay)			sys->sleep(readDelay);	}}// -----------------------------------void ChanMgr::startSearch(ChanInfo &info){	searchInfo = info;	clearHitLists();	numFinds = 0;	lastHit = 0;	lastSearch = 0;	searchActive = true;}// -----------------------------------void ChanMgr::lockHitList(GnuID &id, bool on){	ChanHitList *chl = findHitListByID(id);	if (chl)		chl->locked = on;}// -----------------------------------Channel *ChanMgr::findChannel(ChanInfo &info){	Channel *c=NULL;	findChannels(info,&c,1);	return c;}// -----------------------------------Channel *ChanMgr::findChannelByName(const char *n){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (stricmp(channels[i].info.name,n)==0)				return &channels[i];	return NULL;}// -----------------------------------Channel *ChanMgr::findListenerChannel(){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].numListeners)				return &channels[i];	return NULL;}// -----------------------------------Channel *ChanMgr::findChannelByIndex(int index){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].index == index)				return &channels[i];	return NULL;}	// -----------------------------------Channel *ChanMgr::findChannelByMount(const char *str){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (strcmp(channels[i].mount,str)==0)				return &channels[i];	return NULL;}	// -----------------------------------Channel *ChanMgr::findChannelByID(GnuID &id){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].info.id.isSame(id))				return &channels[i];	return NULL;}	// -----------------------------------Channel *ChanMgr::findPushChannel(int index){	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if ((channels[i].pushIndex == index) && (!channels[i].pushSock))				return &channels[i];	return NULL;}	// -----------------------------------int ChanMgr::findChannels(ChanInfo &info, Channel **ch, int max){	int cnt=0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].info.match(info))			{				ch[cnt++] = &channels[i];				if (cnt >= max)					break;			}	return cnt;}// -----------------------------------Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected){	Channel *c = chanMgr->createChannel(info,NULL);	if (c)	{		c->stayConnected = stayConnected;		c->startFind();		return c;	}	return NULL;}// -----------------------------------int ChanMgr::findAndRelay(ChanInfo &info, Channel **ch, int max){	int cnt=0;	char idStr[64];	info.id.toStr(idStr);	LOG_CHANNEL("Searching for: %s %s",info.name.cstr(),idStr);	for(int i=0; i<180; i++)	// search for 3 minutes.	{		ChanHitList *chl = findHitList(info);		if (chl)		{			Channel *c;			c = findChannelByID(chl->info.id);			if (!c)			{				c = chanMgr->createChannel(chl->info,NULL);				if (c)					c->startGet();			}			if (!c)				break;						ch[0] = c;			cnt = 1;			break;		}else		{			if ((i%60) == 0)				servMgr->findChannel(info);		}		sys->sleep(1000);	}	LOG_CHANNEL("Search results: %d",cnt);	return cnt;}// -----------------------------------ChanMgr::ChanMgr(){	int i;	for(i=0; i<MAX_CHANNELS; i++)		channels[i].init();	for(i=0; i<MAX_HITLISTS; i++)	{		hitlists[i].index = i;		hitlists[i].init();	}	broadcastMsg.clear();	broadcastMsgInterval=10;	broadcastID.generate();	deadHitAge = 300;	icyMetaInterval = 8192;		maxStreamsPerChannel = 0;	searchInfo.init();	fullHitFrequency = 1;	broadcastTTL = 7;	pushTimeout = 60;	// 1 minute	pushTries = 5;		// 5 times	maxPushHops = 8;	// max 8 hops away}// -----------------------------------void ChanMgr::broadcastRelays(int ttl, Servent *serv){	//if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP())	{		Channel *hits[16];		int cnt=0;		ChanInfo info;		info.init();		info.status = ChanInfo::S_PLAY;		cnt = findChannels(info,hits,16);		GnuPacket out;		if (cnt)		{			Host sh = servMgr->serverHost;			bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF);			bool busy = (servMgr->inFull() && servMgr->outFull()) || servMgr->streamFull();			bool stable = servMgr->totalStreams>0;			if (out.initHit(sh,cnt,hits,NULL,0x0114,push,busy,stable,ttl))			{				int numOut=0;				if (serv)				{					if (serv->outputPacket(out,true))						numOut=1;				}else					numOut = servMgr->broadcast(out,NULL);				LOG_CHANNEL("Relay Broadcast %d channels to %d servents",cnt,numOut);					}else{				LOG_ERROR("Relay Broadcast failed");					}		}	}}// -----------------------------------void ChanMgr::setBroadcastMsg(String &msg){	//if (!msg.isSame(broadcastMsg))	{		broadcastMsg = msg;		for(int i=0; i<MAX_CHANNELS; i++)		{			Channel *c = &channels[i];			if (c->isActive())				if (c->status == Channel::S_BROADCASTING)				{					c->info.comment = broadcastMsg;					c->updateMeta();				}		}	}}// -----------------------------------void ChanMgr::clearHitLists(){	for(int i=0; i<MAX_HITLISTS; i++)		if (!hitlists[i].locked)		{			peercastApp->delChannel(&hitlists[i].info);			hitlists[i].init();		}}// -----------------------------------Channel *ChanMgr::createChannel(ChanInfo &info, const char *mount){	lock.on();	for(int i=0; i<MAX_CHANNELS; i++)	{		Channel *c = &channels[i];		if (!c->isActive())		{			c->info = info;			c->info.lastPlay = 0;			c->info.status = ChanInfo::S_UNKNOWN;			if (mount)				c->mount.set(mount);			c->index = i+1;			c->setStatus(Channel::S_WAIT);			c->type = Channel::T_ALLOCATED;			lock.off();			return c;		}	}	lock.off();	return NULL;}// -----------------------------------ChanHitList *ChanMgr::findHitList(ChanInfo &info){	for(int i=0; i<MAX_HITLISTS; i++)		if (hitlists[i].isUsed())			if (hitlists[i].info.match(info))				return &hitlists[i];	return NULL;}// -----------------------------------ChanHitList *ChanMgr::findHitListByID(GnuID &id){	for(int i=0; i<MAX_HITLISTS; i++)		if (hitlists[i].isUsed())			if (hitlists[i].info.id.isSame(id))				return &hitlists[i];	return NULL;}// -----------------------------------int ChanMgr::numHitLists(){	int num=0;	for(int i=0; i<MAX_HITLISTS; i++)		if (hitlists[i].isUsed())			num++;	return num;}// -----------------------------------ChanHitList *ChanMgr::addHitList(ChanInfo &info){	ChanHitList *hl = NULL;	for(int i=0; i<MAX_HITLISTS; i++)		if (!hitlists[i].isUsed())		{			hl = &hitlists[i];			break;		}	if (hl)	{		hl->info = info;		peercastApp->addChannel(&hl->info);	}	return hl;}// -----------------------------------void ChanMgr::clearDeadHits(){	for(int i=0; i<MAX_HITLISTS; i++)		if (hitlists[i].isUsed())			if (hitlists[i].clearDeadHits(deadHitAge) == 0)			{				peercastApp->delChannel(&hitlists[i].info);				hitlists[i].init();			}}// -----------------------------------int ChanMgr::numConnected(){	int tot = 0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			tot++;	return tot;}// -----------------------------------int ChanMgr::numRelayed(){	int tot = 0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].isPlaying())				if (channels[i].numRelays>0)					tot++;	return tot;}// -----------------------------------int ChanMgr::numListeners(){	int tot = 0;	for(int i=0; i<MAX_CHANNELS; i++)		if (channels[i].isActive())			if (channels[i].isPlaying())				if (channels[i].numListeners>0)					tot++;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -