⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp

📁 这是和p2p相关的一份源码
💻 CPP
📖 第 1 页 / 共 4 页
字号:
					ch->sock->writeLine("GET /%s/%s HTTP/1.0",ch->info.getTypeExt(ch->info.srcType),idStr);					ch->sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT);					// request metadata for mp3					if (ch->info.srcType == ChanInfo::T_MP3)						ch->sock->writeLine("icy-metadata:1");					ch->sock->writeLine("");					ch->icyMetaInterval = 0;					HTTP http(*ch->sock);					http.checkResponse(200);					while (http.nextHeader())					{						if (http.isHeader("icy-metaint"))							ch->icyMetaInterval = http.getArgInt();						LOG_CHANNEL("Ch.%d Raw GET: %s",ch->index,http.cmdLine);					}				}else{					ch->sock->writeLine("GET /channel/%s HTTP/1.0",idStr);					ch->sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT);					ch->sock->writeLine("");					HTTP http(*ch->sock);					http.checkResponse(200);					while (http.nextHeader())						LOG_CHANNEL("Ch.%d GET: %s",ch->index,http.cmdLine);				}				ch->setStatus(S_RECEIVING);				LOG_CHANNEL("Ch.%d Ready",ch->index);				ch->input = ch->sock;				ch->readStream();				ch->setStatus(S_CLOSING);				LOG_CHANNEL("Ch.%d Closed",ch->index);			}catch(StreamException &e)			{				ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);				if (chl)					chl->deadHit(ch->currSource);				ch->setStatus(S_ABORT);				LOG_ERROR("Ch.%d aborted: %s",ch->index,e.msg);			}			if (ch->sock)			{				ch->sock->close();				delete ch->sock;				ch->sock = NULL;			}			//peercastApp->updateChannelInfo(NULL);		}		sys->sleepIdle();	}	LOG_CHANNEL("Ch.%d closed",ch->index);	chanMgr->lockHitList(ch->info.id,false);	ch->endThread();	return 0;}// -----------------------------------void	Channel::startICY(ClientSocket *cs, SRC_TYPE st){	srcType = st;	type = T_BROADCAST;	cs->timeout = 30000;	sock = cs;	input = cs;	thread.data = this;	thread.func = streamICY;	if (!sys->startThread(&thread))		init();}// -----------------------------------void	Channel::startFind(){	type = T_RELAY;	thread.data = this;	thread.func = findProc;	if (!sys->startThread(&thread))		init();}// -----------------------------------int	Channel::streamICY(ThreadInfo *thread){	thread->lock();	Channel *ch = (Channel *)thread->data;	chanMgr->lockHitList(ch->info.id,true);	LOG_CHANNEL("Channel started: %s",ch->getName());	try 	{		while ((thread->active) && (!ch->input->eof()))		{			ch->setStatus(S_BROADCASTING);			ch->readStream();		}		LOG_CHANNEL("Ch.%d ended",ch->index);	}catch(StreamException &e)	{		ch->setStatus(S_ABORT);		LOG_ERROR("Ch.%d aborted: %s",ch->index,e.msg);		sys->sleep(1000);		ch->input->close();	}	ch->setStatus(S_CLOSING);	LOG_CHANNEL("Ch.%d stopped",ch->index);	if (ch->input)	{		ch->input->close();		delete ch->input;	}	chanMgr->lockHitList(ch->info.id,false);	ch->endThread();	return 0;}// -----------------------------------static char *nextMetaPart(char *str,char delim){	while (*str)	{		if (*str == delim)		{			*str++ = 0;			return str;		}		str++;	}	return NULL;}// -----------------------------------static void copyStr(char *to,char *from,int max){	char c;	while ((c=*from++) && (--max))		if (c != '\'')			*to++ = c;	*to = 0;}// -----------------------------------void Channel::processMetadata(char *str){	char *cmd=str;	while (cmd)	{		char *arg = nextMetaPart(cmd,'=');		if (!arg)			break;		char *next = nextMetaPart(arg,';');		if (strcmp(cmd,"StreamTitle")==0)			info.track.title.setUnquote(arg,String::T_ASCII);		else if (strcmp(cmd,"StreamUrl")==0)			info.track.contact.setUnquote(arg,String::T_ASCII);		cmd = next;	}	updateMeta();}// -----------------------------------XML::Node *ChanHitList::createHitsXML(){	return new XML::Node("hits listeners=\"%d\" hosts=\"%d\" busy=\"%d\" stable=\"%d\" firewalled=\"%d\" closest=\"%d\" furthest=\"%d\" newest=\"%d\"",		numListeners(),		numHits(),		numBusy(),		numStable(),		numFirewalled(),		closestHit(),		furthestHit(),		sys->getTime()-newestHit()		);		}// -----------------------------------XML::Node *Channel::createRelayXML(bool showStat){	const char *ststr;	ststr = getStatusStr();	if (!showStat)		if ((status == S_RECEIVING) || (status == S_BROADCASTING))			ststr = "OK";	ChanHitList *chl = chanMgr->findHitListByID(info.id);	return new XML::Node("relay listeners=\"%d\" relays=\"%d\" hosts=\"%d\" status=\"%s\"",		numListeners,		numRelays,		(chl!=NULL)?chl->numHits():0,		ststr		);	}// -----------------------------------void ChanMeta::fromXML(XML &xml){	MemoryStream tout(data,MAX_DATALEN);	xml.write(tout);	len = tout.pos;}// -----------------------------------void ChanMeta::fromMem(void *p, int l){	len = l;	memcpy(data,p,len);}// -----------------------------------void ChanMeta::addMem(void *p, int l){	if ((len+l) <= MAX_DATALEN)	{		memcpy(data+len,p,l);		len += l;	}}// -----------------------------------void Channel::updateMeta(){	XML xml;	XML::Node *n = info.createChannelXML();	n->add(info.createTrackXML());//	n->add(info.createServentXML());	xml.setRoot(n);	insertMeta.fromXML(xml);	ChanPacket pack;	pack.init('META',insertMeta.data,insertMeta.len);	chanData.writePacket(pack);}// -----------------------------------void Channel::readStream(){	chanMgr->broadcastRelays(chanMgr->broadcastTTL,NULL);	switch(info.srcType)	{		case ChanInfo::T_MP3:			LOG_CHANNEL("Ch.%d is MP3 - meta: %d",index,icyMetaInterval);			readMP3();			break;		case ChanInfo::T_NSV:			LOG_CHANNEL("Ch.%d is NSV",index);			readRaw();			break;		case ChanInfo::T_OGG:			LOG_CHANNEL("Ch.%d is OGG",index);			readOGG();			break;		case ChanInfo::T_MOV:			LOG_CHANNEL("Ch.%d is MOV",index);			readMOV();			break;		case ChanInfo::T_MPG:			LOG_CHANNEL("Ch.%d is MPG",index);			readMPG();			break;		case ChanInfo::T_PEERCAST:			LOG_CHANNEL("Ch.%d is Peercast",index);			readPeercast();			break;		default:			LOG_CHANNEL("Ch.%d is Raw",index);			readRaw();	}}// -----------------------------------void Channel::readPeercast(){	ChanPacket pack;	if (input->readTag() != 'PCST')		throw StreamException("Not PeerCast stream");	syncPos = 0;	info.numSkips = 0;	peercastApp->channelStart(&info);	while ((!input->eof() && (thread.active)))	{		pack.read(*input);		chanData.writePacket(pack);		MemoryStream mem(pack.data,pack.len);		switch(pack.type)		{			case 'HEAD':				LOG_CHANNEL("Ch.%d HEAD: %d",index,pack.len);				if (pack.len > ChanMeta::MAX_DATALEN)					throw StreamException("Bad HEAD");				headMeta.fromMem(pack.data,pack.len);				break;			case 'META':				LOG_CHANNEL("Ch.%d META: %d",index,pack.len);				insertMeta.fromMem(pack.data,pack.len);				{					if (pack.len)					{						XML xml;						xml.read(mem);						XML::Node *n = xml.findNode("channel");											if (n)						{							info.updateFromXML(n);							LOG_CHANNEL("Ch.%d track update: %s - %s",index,info.track.artist.cstr(),info.track.title.cstr());							peercastApp->channelUpdate(&info);							ChanHitList *chl = chanMgr->findHitListByID(info.id);							if (chl)								chl->info.updateFromXML(n);;						}					}				}				break;			case 'DATA':				//LOG_CHANNEL("DATA: %d",pack.len);				if (info.numSkips)					info.numSkips--;				break;			case 'SYNC':				{					unsigned int s = mem.readLong();					if ((s-syncPos) != 1)					{						LOG_CHANNEL("Ch.%d SKIP: %d to %d (%d)",index,syncPos,s,info.numSkips);						if (syncPos)						{							info.numSkips++;							if (info.numSkips>50)								throw StreamException("Bumped - Too many skips");						}					}					syncPos = s;				}				break;			default:				LOG_CHANNEL("Bad channel packet: %x",pack.type);		}		if (checkBump())			throw StreamException("Bumped");		if (checkIdle())			break;		if (readDelay)			sys->sleep(readDelay);	}	peercastApp->channelStop(&info);}// -----------------------------------void Channel::readRaw(){	ChanPacket pack;	while ((!input->eof() && (thread.active)))	{		syncPos++;		pack.init('SYNC',&syncPos,sizeof(syncPos));		chanData.writePacket(pack);		pack.len = sizeof(pack.data);		input->read(pack.data,pack.len);		pack.type = 'DATA';		chanData.writePacket(pack);		if (checkBump())			throw StreamException("Bumped");		if (checkIdle())			break;		if (readDelay)			sys->sleep(readDelay);	}}// -----------------------------------void Channel::readMPG(){	ChanPacket pack;//	for(int i=0; i<10; i++)//	{//		unsigned int v = in.readLong();//		LOG_CHANNEL("raw %d: %08x",i,SWAP4(v));//	}	while ((!numListeners) && (thread.active))		sys->sleep(1000);	sys->sleep(2000);//	in.read(headMeta.data,1024);//	headMeta.len = 1024;	while ((!input->eof() && (thread.active)))	{		syncPos++;		pack.init('SYNC',&syncPos,sizeof(syncPos));		chanData.writePacket(pack);		int rlen = 4000;		rlen = input->read(pack.data,rlen);		//LOG_CHANNEL("raw read %d - %d",syncPos,rlen);		pack.len = rlen;		pack.type = 'DATA';		chanData.writePacket(pack);		if (checkBump())			throw StreamException("Bumped");		if (checkIdle())			break;		if (readDelay)			sys->sleep(readDelay);	}}// -----------------------------------void Channel::readMP3(){	ChanPacket pack;	while ((!input->eof() && (thread.active)))	{		if (icyMetaInterval)		{			int rlen = icyMetaInterval;			while (rlen)			{				int rl = rlen;				if (rl > sizeof(pack.data))					rl = sizeof(pack.data);				syncPos++;				pack.init('SYNC',&syncPos,sizeof(syncPos));				chanData.writePacket(pack);				pack.len = rl;				input->read(pack.data,pack.len);				pack.type = 'DATA';				chanData.writePacket(pack);				rlen-=rl;			}			unsigned char len;			input->read(&len,1);			if (len)			{				if (len*16 > 1024) len = 1024/16;				char buf[1024];				input->read(buf,len*16);				processMetadata(buf);			}		}else{			syncPos++;			pack.init('SYNC',&syncPos,sizeof(syncPos));			chanData.writePacket(pack);			pack.len = sizeof(pack.data);			input->read(pack.data,pack.len);			pack.type = 'DATA';			chanData.writePacket(pack);		}		if (checkBump())			throw StreamException("Bumped");		if (checkIdle())			break;		if (readDelay)			sys->sleep(readDelay);	}}// -----------------------------------void Channel::readMOV(){	ChanPacket pack;	bool done=false;	headMeta.len = 0;	while ((!input->eof() && (thread.active)) && (!done))	{		unsigned int olen = input->readLong();		unsigned int otag = input->readLong();        // don't ask.		unsigned int len = SWAP4(olen);		unsigned int tag = SWAP4(otag);		len -= 8;		if ((tag == 'mdat') && (len))		{			LOG_CHANNEL("Ch.%d mdat: %d",index,len);			{				LOG_CHANNEL("mov wait: %d",headMeta.len);				headMeta.addMem(&olen,4);				headMeta.addMem(&otag,4);				while ((!numListeners) && (thread.active))					sys->sleep(1000);				sys->sleep(1000);				LOG_CHANNEL("mov go");				while ((len) && (thread.active))				{					syncPos++;					pack.init('SYNC',&syncPos,sizeof(syncPos));					chanData.writePacket(pack);					int rlen = 4000;					rlen = input->read(pack.data,rlen);					pack.type = 'DATA';					pack.len = rlen;					chanData.writePacket(pack);					len -= rlen;					if (checkBump())						done=true;					if (checkIdle())						done=true;					if (readDelay)						sys->sleep(readDelay);				}				LOG_CHANNEL("Mov data end");			}		}else{			LOG_CHANNEL("Ch.%d %c%c%c%c: %d",index,tag>>24&0xff,tag>>16&0xff,tag>>8&0xff,tag&0xff,len);			if (headMeta.len+len > ChanMeta::MAX_DATALEN)				throw StreamException("MOV section too big");			headMeta.addMem(&olen,4);			headMeta.addMem(&otag,4);			if (len)			{				input->read(headMeta.data+headMeta.len,len);				headMeta.len += len;			}		}	}}// -----------------------------------bool OggPacket::isBOS(){	return (data[5] & 0x02) != 0;}// -----------------------------------bool OggPacket::isVorbisPacket(){	char *p = (char *)getContent();	return memcmp(p+1,"vorbis",6)==0;}// -----------------------------------int OggPacket::getVorbisType(){	char *p = (char *)getContent();	return p[0];}// -----------------------------------void OggPacket::read(Stream &in){	// skip until we get OggS identifier	bool gotOgg=false;	while (!gotOgg)		if (in.readChar() == 'O')			if (in.readChar() == 'g')				if (in.readChar() == 'g')					if (in.readChar() == 'S')						gotOgg = true;	len = 0;	data[0] = 'O';	data[1] = 'g';	data[2] = 'g';	data[3] = 'S';	len += 4;	in.read(&data[len],23);	len += 23;	//LOG("OGG: head = %02x",data[5]);	//LOG4("OGG: page num = %d",*(unsigned int *)&data[18]);	//LOG("OGG: serial = %x",*(unsigned int *)&data[14]);	numSegs = data[26];	segLen=0;	in.read(&data[len],numSegs);	for(int i=0; i<numSegs; i++)		segLen += data[len++];	//LOG4("OGG: segs = %d/%d bytes",numSegs,segLen);		if (segLen > MAX_DATALEN-128)		throw StreamException("OGG packet too big");	in.read(&data[len],segLen);	len += segLen;	//LOG4("OGG: packet len=%d",len);}// -----------------------------------void ChanPacket::init(unsigned int t, const void *p, unsigned int l){	type = t;	if (l > MAX_DATALEN)		l = MAX_DATALEN;	len = l;	memcpy(data,p,len);}// -----------------------------------void ChanPacket::write(Stream &out){	out.writeTag(type);	out.writeShort(len);	out.writeShort(0);	out.write(data,len);}// -----------------------------------void ChanPacket::read(Stream &in){	type = in.readTag();	len = in.readShort();	in.readShort();	if (len > MAX_DATALEN)		throw StreamException("Bad ChanPacket");	in.read(data,len);}#if 0// -----------------------------------unsigned int ChanBuffer::read(unsigned int gpos,void *p,unsigned int len){	if (gpos == 0)		gpos = pos;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -