⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp

📁 这是和p2p相关的一份源码
💻 CPP
📖 第 1 页 / 共 4 页
字号:
// ------------------------------------------------// File : channel.cpp// Date: 4-apr-2002// Author: giles// Desc: //		Channel streaming classes. These do the actual //		streaming of media between clients. //// (c) 2002 peercast.org// // ------------------------------------------------// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.// ------------------------------------------------#include <string.h>#include <stdlib.h>#include "common.h"#include "socket.h"#include "channel.h"#include "gnutella.h"#include "servent.h"#include "servmgr.h"#include "sys.h"#include "xml.h"#include "http.h"#include "peercast.h"// -----------------------------------char *Channel::srcTypes[]={	"None",	"Peercast",	"SHOUTcast",	"Icecast",	"URL"};// -----------------------------------char *Channel::statusMsgs[]={	"NONE",	"WAIT",	"CONNECT",	"REQUEST",	"CLOSE",	"RECEIVE",	"BROADCAST",	"ABORT",	"SEARCH",	"NOHOSTS",	"IDLE",	"ERROR"};// -----------------------------------void readXMLString(String &str, XML::Node *n, const char *arg){	char *p;	p = n->findAttr(arg);	if (p)	{		str.set(p,String::T_HTML);		str.convertTo(String::T_ASCII);	}}// -----------------------------------------------------------------------------// Initialise the channel to its default settings of unallocated and reset.// -----------------------------------------------------------------------------void Channel::init(){	reset();}// -----------------------------------------------------------------------------// Close this channel and stop thread// -----------------------------------------------------------------------------void Channel::close(){	thread.active = false;	setStatus(S_CLOSING);}// -----------------------------------------------------------------------------void Channel::endThread(){	close();	thread.unlock();	init();}// -----------------------------------------------------------------------------void Channel::setStatus(STATUS s){	status = s;	if (isPlaying())	{		info.status = ChanInfo::S_PLAY;		info.lastPlay = sys->getTime();	}else{		info.status = ChanInfo::S_UNKNOWN;		info.lastPlay = 0;	}}	// -----------------------------------------------------------------------------// Reset channel and make it available // -----------------------------------------------------------------------------void Channel::reset(){	currSource.init();	srcType = SRC_NONE;	lastIdleTime = 0;	prefetchCnt=0;			numRelays = 0;	numListeners = 0;	info.init();	index = 0;	mount.clear();	bump = false;	stayConnected = false;	icyMetaInterval = 0;	syncPos = 0;	headMeta.init();	insertMeta.init();	chanData.init();	setStatus(S_NONE);	type = T_NONE;	readDelay = 0;	sock = NULL;	sourceURL.clear();}// -----------------------------------bool	Channel::checkIdle(){	if ((numListeners > 0) || (stayConnected || (status == S_BROADCASTING)))	{		prefetchCnt=0;		return false;	}	if (prefetchCnt) 		prefetchCnt--;	if (prefetchCnt)		LOG_CHANNEL("prefetch %d",prefetchCnt);	return prefetchCnt==0;}// -----------------------------------bool	Channel::isFull(){	return chanMgr->maxStreamsPerChannel ? numRelays >= chanMgr->maxStreamsPerChannel : false;}// -----------------------------------void	Channel::startMP3File(char *fn){	type = T_BROADCAST;	FileStream *fs = new FileStream();	fs->openReadOnly(fn);	input = fs;	thread.data = this;	thread.func = streamMP3File;	if (!sys->startThread(&thread))		init();}// -----------------------------------int	Channel::streamMP3File(ThreadInfo *thread){	thread->lock();	Channel *ch = (Channel *)thread->data;	LOG_CHANNEL("Channel started: %s",ch->getName());	try 	{		while (thread->active)		{			ch->input->read(&ch->mp3Head,sizeof(MP3Header));			ch->readMP3();			ch->input->rewind();			LOG_CHANNEL("%s end",ch->getName());		}	}catch(StreamException &e)	{		LOG_ERROR("Unable to read file: %s",e.msg);	}	ch->input->close();	delete ch->input;	LOG_CHANNEL("Channel stopped: %s",ch->getName());	ch->endThread();	return 0;}// -----------------------------------void	Channel::startGet(){	srcType = SRC_PEERCAST;	type = T_RELAY;	input = NULL;	info.srcType = ChanInfo::T_PEERCAST;	thread.data = this;	thread.func = streamGet;	if (!sys->startThread(&thread))		init();}// -----------------------------------void	Channel::startURL(const char *u){	sourceURL.set(u);	srcType = SRC_URL;	type = T_BROADCAST;	stayConnected = true;	// source type should be set before here.	//info.srcType = ChanInfo::T_UNKNOWN;		thread.data = this;	thread.func = streamURL;	if (!sys->startThread(&thread))		init();}// -----------------------------------int	Channel::findProc(ThreadInfo *thread){	thread->lock();	Channel *ch = (Channel *)thread->data;	ch->setStatus(S_SEARCHING);	int findCnt=0;	while (thread->active)	{		ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);		if (chl && chl->numHits())		{			// update chaninfo with latest 			ch->info = chl->info;			ch->setStatus(S_IDLE);			thread->unlock();			ch->startGet();			return 0;		}else		{			if ((findCnt%60) == 0)				servMgr->findChannel(ch->info);			if (findCnt++ > 300)		// give up eventually				break;		}		sys->sleep(1000);	}	ch->endThread();	return 0;}// -----------------------------------String Channel::streamURL(const char *url){	String nextURL;	String urlTmp;	urlTmp.set(url);	char *fileName = urlTmp.cstr();	Stream *file = NULL;	PlayList *pls=NULL;	LOG_CHANNEL("Fetch URL=%s",fileName);	try	{		if (strnicmp(fileName,"http://",7)==0)		{			fileName+=7;			ClientSocket *sock = sys->createSocket();			if (!sock)				throw StreamException("Ch.%d cannot create socket",index);			file = sock;			char *dir = strstr(fileName,"/");			if (dir)				*dir++=0;			LOG_CHANNEL("Fetch Host=%s",fileName);			if (dir)				LOG_CHANNEL("Fetch Dir=%s",dir);			setStatus(S_CONNECTING);			Host host;			host.fromStr(fileName,80);			sock->open(host);			sock->connect();			HTTP http(*sock);			http.writeLine("GET /%s HTTP/1.1",dir?dir:"");			http.writeLine("%s %s",HTTP_HS_HOST,fileName);			http.writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT);   // "WinampMPEG/2.8";			http.writeLine("%s %s",HTTP_HS_CONNECTION,"close");			http.writeLine("%s %s",HTTP_HS_ACCEPT,"*/*");			http.writeLine("icy-metadata:1");			http.writeLine("");			int res = http.readResponse();			if ((res!=200) && (res!=302))			{				LOG_ERROR("HTTP response: %s",http.cmdLine);				throw StreamException("Bad HTTP connect");			}			String name = info.name;			while (http.nextHeader())			{				LOG_CHANNEL("Fetch HTTP: %s",http.cmdLine);				ChanInfo tmpInfo = info;				Servent::readICYHeader(http,info,NULL);				if (!tmpInfo.name.isEmpty())					info.name = tmpInfo.name;				if (!tmpInfo.genre.isEmpty())					info.genre = tmpInfo.genre;				if (!tmpInfo.url.isEmpty())					info.url = tmpInfo.url;				if (http.isHeader("icy-metaint"))					icyMetaInterval = http.getArgInt();				else if (http.isHeader("Location:"))					nextURL.set(http.getArgStr());				char *arg = http.getArgStr();				if (arg)				{					if (http.isHeader("content-type"))					{						if (stristr(arg,MIME_XSCPLS))							pls = new PlayList(PlayList::T_SCPLS, 1000);						else if (stristr(arg,MIME_PLS))							pls = new PlayList(PlayList::T_PLS, 1000);						else if (stristr(arg,MIME_XPLS))							pls = new PlayList(PlayList::T_PLS, 1000);						else if (stristr(arg,MIME_M3U))							pls = new PlayList(PlayList::T_PLS, 1000);						else if (stristr(arg,MIME_TEXT))							pls = new PlayList(PlayList::T_PLS, 1000);					}				}			}			if ((!nextURL.isEmpty()) && (res==302))			{				LOG_CHANNEL("Ch.%d redirect: %s",index,nextURL.cstr());				sock->close();				delete sock;				sock = NULL;				return nextURL;			}		//}else if (strnicmp(fileName,"file://",7)==0) 		}else{			if (strnicmp(fileName,"file://",7)==0) 				fileName+=7;			FileStream *fs = new FileStream();			fs->openReadOnly(fileName);			file = fs;			// if filetype is unknown, try and figure it out from file extension.			//if ((info.srcType == ChanInfo::T_UNKNOWN) || (info.srcType == ChanInfo::T_PLAYLIST))			{				const char *ext = fileName+strlen(fileName);				while (*--ext)					if (*ext=='.')					{						ext++;						break;					}				ChanInfo::TYPE type = ChanInfo::getTypeFromStr(ext);				if (type != ChanInfo::T_UNKNOWN)					info.srcType = type;			}			if (info.bitrate)				readDelay = (ChanPacket::MAX_DATALEN*1000)/((info.bitrate*1024)/8);			else				readDelay = 1;			if (info.srcType == ChanInfo::T_PLAYLIST) 				pls = new PlayList(PlayList::T_PLS, 1000);//		}else //		{//			thread.active = false;			// fatal error//			throw StreamException("Bad URL");		}				if (pls)		{			LOG_CHANNEL("Ch.%d is PLS",index);			try			{				char tmp[256];				while (file->readLine(tmp,sizeof(tmp)))				{					//LOG_CHANNEL("PLS: %s",tmp);					switch (pls->type)					{						case PlayList::T_PLS:							if (tmp[0] != '#')								pls->addURL(tmp,"");							break;						case PlayList::T_SCPLS:							if (strnicmp(tmp,"file",4)==0)							{								char *p = strstr(tmp,"=");								if (p)									pls->addURL(p+1,"");							}							break;					}				}			}catch(StreamException &)			{}			file->close();			delete file;			file = NULL;			int urlNum=0;			String url;			LOG_CHANNEL("Playlist: %d URLs",pls->numURLs);			while ((thread.active) && (pls->numURLs))			{				if (url.isEmpty())				{					url = pls->urls[urlNum%pls->numURLs];					urlNum++;				}				try				{					url = streamURL(url.cstr());				}catch(StreamException &)				{}			}			delete pls;					}else		{			// if we didn`t get a channel id from the source, then create our own (its an original broadcast)			if (!info.id.isSet())			{				info.id = chanMgr->broadcastID;				info.id.encode(&servMgr->serverHost,info.name.cstr(),NULL,info.bitrate);			}			info.contentType = info.srcType;			input = file;			setStatus(S_BROADCASTING);			readStream();			file->close();		}	}catch(StreamException &e)	{		setStatus(S_ERROR);		LOG_ERROR("Ch.%d error: %s",index,e.msg);		sys->sleep(1000);	}				setStatus(S_CLOSING);	if (file)	{		file->close();		delete file;	}	return nextURL;}// -----------------------------------int	Channel::streamURL(ThreadInfo *thread){	thread->lock();	Channel *ch = (Channel *)thread->data;	ClientSocket *sock = NULL;	LOG_CHANNEL("Ch.%d started: %s",ch->index,ch->sourceURL.cstr());	String url;	while (thread->active)	{		if (url.isEmpty())			url = ch->sourceURL;		url = ch->streamURL(url.cstr());	}	ch->endThread();	return 0;}	// -----------------------------------int	Channel::streamGet(ThreadInfo *thread){	thread->lock();	GnuPacket pack;	Channel *ch = (Channel *)thread->data;	chanMgr->lockHitList(ch->info.id,true);	LOG_CHANNEL("Ch.%d started: %s",ch->index,ch->getName());	while (thread->active)	{		ch->lastIdleTime = sys->getTime();		ch->setStatus(S_IDLE);		while ((ch->checkIdle()) && (thread->active))			sys->sleepIdle();		if (!thread->active)			break;		//ch->info.title.set("Please wait.",String::T_ASCII);		bool doneSearch=false;		do 		{			Host sh = servMgr->serverHost;			ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);			if (chl)			{				if (servMgr->getFirewall() == ServMgr::FW_OFF)				{					// we are non-firewalled so try non push hosts, then push hosts					ch->currSource = chl->getHit(false);					if (!ch->currSource.host.ip)						ch->currSource = chl->getHit(true);				}else{					// we are firewalled so try non push hosts only					ch->currSource = chl->getHit(false);				}			}			if (sh.isSame(ch->currSource.host))				ch->currSource.host.ip = 0;			if (!ch->currSource.host.ip)			{				ch->setStatus(S_SEARCHING);				if (!doneSearch)				{					LOG_CHANNEL("Ch.%d search..",ch->index);					if (servMgr->findChannel(ch->info))						doneSearch = true;				}				sys->sleepIdle();			}		}while((ch->currSource.host.ip==0) && (thread->active));		// totally give up		if (!ch->currSource.host.ip)			break;		{			try 			{				char hostName[64];				ch->currSource.host.IPtoStr(hostName);				if (!ch->currSource.firewalled || (servMgr->serverHost.localIP() && ch->currSource.host.localIP()))				{					ClientSocket *s = sys->createSocket();					if (!s)						throw StreamException("Ch.%d cannot create socket",ch->index);					ch->sock = s;					ch->setStatus(S_CONNECTING);					ch->sock->open(ch->currSource.host);					ch->sock->timeout = 10000;					ch->sock->connect();					LOG_CHANNEL("Ch.%d connect to %s",ch->index,hostName);				}else{					Host sh = servMgr->serverHost;					if (!sh.isValid() || sh.loopbackIP())						throw StreamException("No Server, unable to ask for push.");					ch->setStatus(S_REQUESTING);					sys->sleep(500);	// wait a bit for the previous find to go					int timeout;					LOG_CHANNEL("Ch.%d Push request",ch->index);										ch->pushSock = NULL;					ch->pushIndex = ch->currSource.index;					for(int i=0; i<chanMgr->pushTries; i++)					{						LOG_NETWORK("Push-request try %d",i+1);						pack.initPush(ch->currSource,sh);						servMgr->route(pack,ch->currSource.packetID,NULL);						timeout = chanMgr->pushTimeout;								while ((!ch->pushSock) && (thread->active))						{							if (ch->checkBump())								throw StreamException("Bumped");							if (timeout-- <= 0)								break;							sys->sleep(1000);						}						if (ch->pushSock || (!thread->active))							break;					}					if (!ch->pushSock)						throw StreamException("Push timeout");					ch->setStatus(S_CONNECTING);					ch->sock = ch->pushSock;								}				char idStr[64];				ch->info.id.toStr(idStr);								if (ch->info.srcType != ChanInfo::T_PEERCAST)				{					// raw data stream

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -