⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp

📁 这是和p2p相关的一份源码
💻 CPP
📖 第 1 页 / 共 5 页
字号:
		}while((ch->currSource.host.ip==0) && (thread->active));		// totally give up		if (!ch->currSource.host.ip)			break;		{			try 			{				char hostName[64];				ch->currSource.host.IPtoStr(hostName);				if (!ch->currSource.firewalled || (servMgr->serverHost.localIP() && ch->currSource.host.localIP()))				{					ClientSocket *s = sys->createSocket();					if (!s)						throw StreamException("Ch.%d cannot create socket",ch->index);					ch->sock = s;					ch->setStatus(S_CONNECTING);					ch->sock->open(ch->currSource.host);					ch->sock->timeout = 10000;					ch->sock->connect();					LOG_CHANNEL("Ch.%d connect to %s",ch->index,hostName);				}else{					Host sh = servMgr->serverHost;					if (!sh.isValid() || sh.loopbackIP())						throw StreamException("No Server, unable to ask for push.");					ch->setStatus(S_REQUESTING);					sys->sleep(500);	// wait a bit for the previous find to go					int timeout;					LOG_CHANNEL("Ch.%d Push request",ch->index);										ch->pushSock = NULL;					ch->pushIndex = ch->currSource.index;					for(int i=0; i<chanMgr->pushTries; i++)					{						LOG_NETWORK("Push-request try %d",i+1);						pack.initPush(ch->currSource,sh);						servMgr->route(pack,ch->currSource.packetID,NULL);						timeout = chanMgr->pushTimeout;								while ((!ch->pushSock) && (thread->active))						{							if (ch->checkBump())								throw StreamException("Bumped");							if (timeout-- <= 0)								break;							sys->sleep(1000);						}						if (ch->pushSock || (!thread->active))							break;					}					if (!ch->pushSock)						throw StreamException("Push timeout");					ch->setStatus(S_CONNECTING);					ch->sock = ch->pushSock;								}				char idStr[64];				ch->info.id.toStr(idStr);								if (ch->info.srcProtocol != ChanInfo::SP_PEERCAST)				{					// raw data stream					ch->sock->writeLine("GET /%s/%s HTTP/1.0",ch->info.getTypeExt(ch->info.contentType),idStr);					ch->sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT);					// request metadata for mp3					if (ch->info.contentType == ChanInfo::T_MP3)						ch->sock->writeLine("icy-metadata:1");					ch->sock->writeLine("");					ch->icyMetaInterval = 0;					HTTP http(*ch->sock);					http.checkResponse(200);					while (http.nextHeader())					{						if (http.isHeader("icy-metaint"))							ch->icyMetaInterval = http.getArgInt();						LOG_CHANNEL("Ch.%d Raw GET: %s",ch->index,http.cmdLine);					}				}else{					ch->sock->writeLine("GET /channel/%s HTTP/1.0",idStr);					ch->sock->writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT);					ch->sock->writeLine("");					HTTP http(*ch->sock);					http.checkResponse(200);					while (http.nextHeader())						LOG_CHANNEL("Ch.%d GET: %s",ch->index,http.cmdLine);				}				ch->setStatus(S_RECEIVING);				LOG_CHANNEL("Ch.%d Ready",ch->index);				ch->input = ch->sock;				ch->readStream();				ch->setStatus(S_CLOSING);				LOG_CHANNEL("Ch.%d Closed",ch->index);			}catch(StreamException &e)			{				ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);				if (chl)					chl->deadHit(ch->currSource);				ch->setStatus(S_ABORT);				LOG_ERROR("Ch.%d aborted: %s",ch->index,e.msg);			}			if (ch->sock)			{				ch->sock->close();				delete ch->sock;				ch->sock = NULL;			}			//peercastApp->updateChannelInfo(NULL);		}		sys->sleepIdle();	}	LOG_CHANNEL("Ch.%d closed",ch->index);	chanMgr->lockHitList(ch->info.id,false);	ch->endThread();	return 0;}// -----------------------------------void	Channel::startICY(ClientSocket *cs, SRC_TYPE st){	srcType = st;	type = T_BROADCAST;	cs->timeout = 30000;	sock = cs;	input = cs;	info.srcProtocol = ChanInfo::SP_HTTP;	thread.data = this;	thread.func = streamICY;	if (!sys->startThread(&thread))		init();}// -----------------------------------void	Channel::startFind(){	type = T_RELAY;	thread.data = this;	thread.func = findProc;	if (!sys->startThread(&thread))		init();}// -----------------------------------int	Channel::streamICY(ThreadInfo *thread){	thread->lock();	Channel *ch = (Channel *)thread->data;	chanMgr->lockHitList(ch->info.id,true);	LOG_CHANNEL("Channel started: %s",ch->getName());	try 	{		while ((thread->active) && (!ch->input->eof()))		{			ch->setStatus(S_BROADCASTING);			ch->readStream();		}		LOG_CHANNEL("Ch.%d ended",ch->index);	}catch(StreamException &e)	{		ch->setStatus(S_ABORT);		LOG_ERROR("Ch.%d aborted: %s",ch->index,e.msg);		sys->sleep(1000);		ch->input->close();	}	ch->setStatus(S_CLOSING);	LOG_CHANNEL("Ch.%d stopped",ch->index);	if (ch->input)	{		ch->input->close();		delete ch->input;	}	chanMgr->lockHitList(ch->info.id,false);	ch->endThread();	return 0;}// -----------------------------------static char *nextMetaPart(char *str,char delim){	while (*str)	{		if (*str == delim)		{			*str++ = 0;			return str;		}		str++;	}	return NULL;}// -----------------------------------static void copyStr(char *to,char *from,int max){	char c;	while ((c=*from++) && (--max))		if (c != '\'')			*to++ = c;	*to = 0;}// -----------------------------------void Channel::processMetadata(char *str){	char *cmd=str;	while (cmd)	{		char *arg = nextMetaPart(cmd,'=');		if (!arg)			break;		char *next = nextMetaPart(arg,';');		if (strcmp(cmd,"StreamTitle")==0)			info.track.title.setUnquote(arg,String::T_ASCII);		else if (strcmp(cmd,"StreamUrl")==0)			info.track.contact.setUnquote(arg,String::T_ASCII);		cmd = next;	}	updateMeta();}// -----------------------------------XML::Node *ChanHit::createXML(){	// IP	char ipStr[64];	host.toStr(ipStr);		return new XML::Node("host ip=\"%s\" hops=\"%d\" listeners=\"%d\" uptime=\"%d\" skips=\"%d\" push=\"%d\" busy=\"%d\" stable=\"%d\" agent=\"%s\" update=\"%d\" ",		ipStr,		hops,		numListeners,		upTime,		numSkips,		firewalled?1:0,		busy?1:0,		stable?1:0,		agentStr,		sys->getTime()-time		);}// -----------------------------------XML::Node *ChanHitList::createHitsXML(){	XML::Node *hn = new XML::Node("hits listeners=\"%d\" hosts=\"%d\" busy=\"%d\" stable=\"%d\" firewalled=\"%d\" closest=\"%d\" furthest=\"%d\" newest=\"%d\"",		numListeners(),		numHits(),		numBusy(),		numStable(),		numFirewalled(),		closestHit(),		furthestHit(),		sys->getTime()-newestHit()		);			for(int i=0; i<MAX_HITS; i++)		if (hits[i].host.ip)			hn->add(hits[i].createXML());	return hn;}// -----------------------------------XML::Node *Channel::createRelayXML(bool showStat){	const char *ststr;	ststr = getStatusStr();	if (!showStat)		if ((status == S_RECEIVING) || (status == S_BROADCASTING))			ststr = "OK";	ChanHitList *chl = chanMgr->findHitListByID(info.id);	return new XML::Node("relay listeners=\"%d\" relays=\"%d\" hosts=\"%d\" status=\"%s\"",		numListeners,		numRelays,		(chl!=NULL)?chl->numHits():0,		ststr		);	}// -----------------------------------void ChanMeta::fromXML(XML &xml){	MemoryStream tout(data,MAX_DATALEN);	xml.write(tout);	len = tout.pos;}// -----------------------------------void ChanMeta::fromMem(void *p, int l){	len = l;	memcpy(data,p,len);}// -----------------------------------void ChanMeta::addMem(void *p, int l){	if ((len+l) <= MAX_DATALEN)	{		memcpy(data+len,p,l);		len += l;	}}// -----------------------------------void Channel::updateMeta(){	XML xml;	XML::Node *n = info.createChannelXML();	n->add(info.createTrackXML());//	n->add(info.createServentXML());	xml.setRoot(n);	insertMeta.fromXML(xml);	ChanPacket pack;	pack.init('META',insertMeta.data,insertMeta.len);	chanData.writePacket(pack);}// -----------------------------------void Channel::readStream(){	if (servMgr->relayBroadcast)		chanMgr->broadcastRelays(NULL,chanMgr->minBroadcastTTL,chanMgr->maxBroadcastTTL);	if (info.srcProtocol == ChanInfo::SP_PEERCAST)	{		LOG_CHANNEL("Ch.%d is Peercast",index);		readPeercast();	}	else if (info.srcProtocol == ChanInfo::SP_MMS)	{		LOG_CHANNEL("Ch.%d is MMS",index);		readMMS();	}else	{		switch(info.contentType)		{			case ChanInfo::T_MP3:				LOG_CHANNEL("Ch.%d is MP3 - meta: %d",index,icyMetaInterval);				readMP3();				break;			case ChanInfo::T_NSV:				LOG_CHANNEL("Ch.%d is NSV",index);				readRaw();				break;			case ChanInfo::T_WMA:			case ChanInfo::T_WMV:				throw StreamException("Ch.%d is WMA/WMV - but not MMS",index);				break;			case ChanInfo::T_OGG:				LOG_CHANNEL("Ch.%d is OGG",index);				readOGG();				break;			case ChanInfo::T_MOV:				LOG_CHANNEL("Ch.%d is MOV",index);				readMOV();				break;			case ChanInfo::T_MPG:				LOG_CHANNEL("Ch.%d is MPG",index);				readMPG();				break;			default:				LOG_CHANNEL("Ch.%d is Raw",index);				readRaw();		}	}}// -----------------------------------void Channel::readPeercast(){	ChanPacket pack;	if (input->readTag() != 'PCST')		throw StreamException("Not PeerCast stream");	syncPos = 0;	info.numSkips = 0;	peercastApp->channelStart(&info);	while ((!input->eof() && (thread.active)))	{		pack.read(*input);		chanData.writePacket(pack);		MemoryStream mem(pack.data,pack.len);		switch(pack.type)		{			case 'HEAD':				//LOG_CHANNEL("Ch.%d HEAD: %d",index,pack.len);				if (pack.len > ChanMeta::MAX_DATALEN)					throw StreamException("Bad HEAD");				headMeta.fromMem(pack.data,pack.len);				break;			case 'META':				//LOG_CHANNEL("Ch.%d META: %d",index,pack.len);				insertMeta.fromMem(pack.data,pack.len);				{					if (pack.len)					{						XML xml;						xml.read(mem);						XML::Node *n = xml.findNode("channel");											if (n)						{							info.updateFromXML(n);							LOG_CHANNEL("Ch.%d track update: %s - %s",index,info.track.artist.cstr(),info.track.title.cstr());							peercastApp->channelUpdate(&info);							ChanHitList *chl = chanMgr->findHitListByID(info.id);							if (chl)								chl->info.updateFromXML(n);;						}					}				}				break;			case 'DATA':				//LOG_CHANNEL("DATA: %d",pack.len);				if (info.numSkips)					info.numSkips--;				break;			case 'SYNC':				{					unsigned int s = mem.readLong();					if ((s-syncPos) != 1)					{						LOG_CHANNEL("Ch.%d SKIP: %d to %d (%d)",index,syncPos,s,info.numSkips);						if (syncPos)						{							info.numSkips++;							if (info.numSkips>50)								throw StreamException("Bumped - Too many skips");						}					}					syncPos = s;				}				break;			default:				LOG_CHANNEL("Bad channel packet: %x",pack.type);		}		if (checkBump())			throw StreamException("Bumped");		if (checkIdle())			break;	}	peercastApp->channelStop(&info);}// -----------------------------------void Channel::readRaw(){	ChanPacket pack;	while ((!input->eof() && (thread.active)))	{		syncPos++;		pack.init('SYNC',&syncPos,sizeof(syncPos));		chanData.writePacket(pack);		pack.len = sizeof(pack.data);		input->read(pack.data,pack.len);		pack.type = 'DATA';		chanData.writePacket(pack);		checkReadDelay(pack.len);		if (checkBump())			throw StreamException("Bumped");		if (checkIdle())			break;	}}// -----------------------------------void Channel::readMPG(){	ChanPacket pack;//	for(int i=0; i<10; i++)//	{//		unsigned int v = in.readLong();//		LOG_CHANNEL("raw %d: %08x",i,SWAP4(v));//	}	while ((!numListeners) && (thread.active))		sys->sleep(1000);	sys->sleep(2000);//	in.read(headMeta.data,1024);//	headMeta.len = 1024;	while ((!input->eof() && (thread.active)))	{		syncPos++;		pack.init('SYNC',&syncPos,sizeof(syncPos));		chanData.writePacket(pack);		int rlen = 4000;		rlen = input->read(pack.data,rlen);		//LOG_CHANNEL("raw read %d - %d",syncPos,rlen);		pack.len = rlen;		pack.type = 'DATA';		chanData.writePacket(pack);		checkReadDelay(pack.len);		if (checkBump())			throw StreamException("Bumped");		if (checkIdle())			break;	}}// -----------------------------------void Channel::readMP3(){	ChanPacket pack;	while ((!input->eof() && (thread.active)))	{		if (icyMetaInterval)		{			int rlen = icyMetaInterval;			while (rlen)			{				int rl = rlen;				if (rl > sizeof(pack.data))					rl = sizeof(pack.data);				syncPos++;				pack.init('SYNC',&syncPos,sizeof(syncPos));				chanData.writePacket(pack);				pack.len = rl;				input->read(pack.data,pack.len);				pack.type = 'DATA';				chanData.writePacket(pack);				checkReadDelay(pack.len);				rlen-=rl;			}			unsigned char len;			input->read(&len,1);			if (len)			{				if (len*16 > 1024) len = 1024/16;				char buf[1024];				input->read(buf,len*16);				processMetadata(buf);			}		}else{			syncPos++;			pack.init('SYNC',&syncPos,sizeof(syncPos));			chanData.writePacket(pack);			pack.len = sizeof(pack.data);			input->read(pack.data,pack.len);			pack.type = 'DATA';			chanData.writePacket(pack);			checkReadDelay(pack.len);		}		if (checkBump())			throw StreamException("Bumped");		if (checkIdle())			break;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -