⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channel.cpp

📁 P2P应用 : Peercast的源代码
💻 CPP
📖 第 1 页 / 共 5 页
字号:
// ------------------------------------------------// File : channel.cpp// Date: 4-apr-2002// Author: giles// Desc: //		Channel streaming classes. These do the actual //		streaming of media between clients. //// (c) 2002 peercast.org// // ------------------------------------------------// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the// GNU General Public License for more details.// ------------------------------------------------#include <string.h>#include <stdlib.h>
#include "common.h"#include "socket.h"#include "channel.h"#include "gnutella.h"#include "servent.h"#include "servmgr.h"#include "sys.h"#include "xml.h"#include "http.h"#include "peercast.h"#include "atom.h"
#include "pcp.h"
#include "mp3.h"#include "ogg.h"#include "mms.h"#include "nsv.h"
#include "icy.h"#include "url.h"
#include "version2.h"
// -----------------------------------char *Channel::srcTypes[]={	"NONE",	"PEERCAST",	"SHOUTCAST",	"ICECAST",	"URL"};// -----------------------------------char *Channel::statusMsgs[]={	"NONE",	"WAIT",	"CONNECT",	"REQUEST",	"CLOSE",	"RECEIVE",	"BROADCAST",	"ABORT",	"SEARCH",	"NOHOSTS",	"IDLE",	"ERROR",
	"NOTFOUND"};// -----------------------------------void readXMLString(String &str, XML::Node *n, const char *arg){	char *p;	p = n->findAttr(arg);	if (p)	{		str.set(p,String::T_HTML);		str.convertTo(String::T_ASCII);	}}// -----------------------------------------------------------------------------// Initialise the channel to its default settings of unallocated and reset.// -----------------------------------------------------------------------------Channel::Channel(){
	next = NULL;	reset();}// -----------------------------------------------------------------------------void Channel::endThread(){
	if (pushSock)
	{
		pushSock->close();
		delete pushSock;
		pushSock = NULL;
	}

	if (sock)
	{
		sock->close();
		sock = NULL;
	}

	if (sourceData)
	{
		sourceData;
		sourceData = NULL;
	}


	reset();

	chanMgr->deleteChannel(this);

	sys->endThread(&thread);

}// -----------------------------------------------------------------------------void Channel::resetPlayTime(){	info.lastPlayStart = sys->getTime();}// -----------------------------------------------------------------------------void Channel::setStatus(STATUS s){
	if (s != status)
	{
		bool wasPlaying = isPlaying();
		status = s;
		if (isPlaying())
		{			info.status = ChanInfo::S_PLAY;			resetPlayTime();
		}else
		{
			if (wasPlaying)
				info.lastPlayEnd = sys->getTime();			info.status = ChanInfo::S_UNKNOWN;
		}

		if (isBroadcasting())
		{
			ChanHitList *chl = chanMgr->findHitListByID(info.id);
			if (!chl)
				chanMgr->addHitList(info);
		}

		peercastApp->channelUpdate(&info);

	}
}	// -----------------------------------------------------------------------------// Reset channel and make it available // -----------------------------------------------------------------------------void Channel::reset(){	sourceHost.init();
	remoteID.clear();
	streamIndex = 0;
	lastIdleTime = 0;	info.init();	mount.clear();	bump = false;	stayConnected = false;	icyMetaInterval = 0;	streamPos = 0;
	insertMeta.init();

	headPack.init();

	sourceStream = NULL;
	rawData.init();
	rawData.accept = ChanPacket::T_HEAD|ChanPacket::T_DATA;

	setStatus(S_NONE);	type = T_NONE;	readDelay = false;	sock = NULL;
	pushSock = NULL;	sourceURL.clear();	sourceData = NULL;
	lastTrackerUpdate = 0;
	lastMetaUpdate = 0;

	srcType = SRC_NONE;


	startTime = 0;
	syncTime = 0;
}

// -----------------------------------
void	Channel::newPacket(ChanPacket &pack)
{
	if (pack.type != ChanPacket::T_PCP)
	{
		rawData.writePacket(pack,true);
	}
}

// -----------------------------------bool	Channel::checkIdle(){	return ( (info.getUptime() > chanMgr->prefetchTime) && (localListeners() == 0) && (!stayConnected) && (status != S_BROADCASTING));
}// -----------------------------------bool	Channel::isFull(){	return chanMgr->maxRelaysPerChannel ? localRelays() >= chanMgr->maxRelaysPerChannel : false;}
// -----------------------------------
int Channel::localRelays()
{
	return servMgr->numStreams(info.id,Servent::T_RELAY,true);
}
// -----------------------------------
int Channel::localListeners()
{
	return servMgr->numStreams(info.id,Servent::T_DIRECT,true);
}

// -----------------------------------
int Channel::totalRelays()
{
	int tot = 0;
	ChanHitList *chl = chanMgr->findHitListByID(info.id);
	if (chl)
		tot += chl->numHits();
	return tot;
}
// -----------------------------------
int Channel::totalListeners()
{
	int tot = localListeners();
	ChanHitList *chl = chanMgr->findHitListByID(info.id);
	if (chl)
		tot += chl->numListeners();
	return tot;
}


// -----------------------------------void	Channel::startGet(){	srcType = SRC_PEERCAST;
	type = T_RELAY;	info.srcProtocol = ChanInfo::SP_PCP;	sourceData = new PeercastSource();	startStream();}// -----------------------------------void	Channel::startURL(const char *u){	sourceURL.set(u);	srcType = SRC_URL;	type = T_BROADCAST;	stayConnected = true;	resetPlayTime();	sourceData = new URLSource(u);	startStream();}// -----------------------------------void Channel::startStream(){	thread.data = this;	thread.func = stream;	if (!sys->startThread(&thread))		reset();}// -----------------------------------
void Channel::sleepUntil(double time)
{
	double sleepTime = time - (sys->getDTime()-startTime);

//	LOG("sleep %g",sleepTime);
	if (sleepTime > 0)
	{
		if (sleepTime > 60) sleepTime = 60;

		double sleepMS = sleepTime*1000;

		sys->sleep((int)sleepMS);
	}
}
// -----------------------------------void Channel::checkReadDelay(unsigned int len){	if (readDelay)	{
		unsigned int time = (len*1000)/((info.bitrate*1024)/8);
		sys->sleep(time);
	}


}// -----------------------------------THREAD_PROC	Channel::stream(ThreadInfo *thread){
//	thread->lock();	Channel *ch = (Channel *)thread->data;
	while (thread->active && !peercastInst->isQuitting)
	{
		LOG_CHANNEL("Channel started");


		ChanHitList *chl = chanMgr->findHitList(ch->info);
		if (!chl)
			chanMgr->addHitList(ch->info);
		ch->sourceData->stream(ch);
		LOG_CHANNEL("Channel stopped");

		if (!ch->stayConnected)
		{
			break;
		}else
		{
			if (!ch->info.lastPlayEnd)
				ch->info.lastPlayEnd = sys->getTime();

			unsigned int diff = (sys->getTime()-ch->info.lastPlayEnd) + 5;

			LOG_DEBUG("Channel sleeping for %d seconds",diff);
			for(unsigned int i=0; i<diff; i++)
			{
				if (!thread->active || peercastInst->isQuitting)
					break;
				sys->sleep(1000);	
			}
		}
	}	ch->endThread();

	return 0;}	
// -----------------------------------
bool Channel::acceptGIV(ClientSocket *givSock)
{
	if (!pushSock)
	{
		pushSock = givSock;
		return true;
	}else
		return false;
}
// -----------------------------------
void Channel::connectFetch()
{
	sock = sys->createSocket();
	
	if (!sock)
		throw StreamException("Can`t create socket");

	if (sourceHost.tracker || sourceHost.yp)
	{
		sock->setReadTimeout(30000);
		sock->setWriteTimeout(30000);
		LOG_CHANNEL("Channel using longer timeouts");
	}

	sock->open(sourceHost.host);
		
	sock->connect();
}

// -----------------------------------
int Channel::handshakeFetch()
{
	char idStr[64];
	info.id.toStr(idStr);

	char sidStr[64];
	servMgr->sessionID.toStr(sidStr);

	sock->writeLineF("GET /channel/%s HTTP/1.0",idStr);
	sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
	sock->writeLineF("%s %d",PCX_HS_PCP,1);

	sock->writeLine("");

	HTTP http(*sock);

	int r = http.readResponse();

	LOG_CHANNEL("Got response: %d",r);

	while (http.nextHeader())
	{
		char *arg = http.getArgStr();
		if (!arg) 
			continue;

		if (http.isHeader(PCX_HS_POS))
			streamPos = atoi(arg);
		else
			Servent::readICYHeader(http, info, NULL);

		LOG_CHANNEL("Channel fetch: %s",http.cmdLine);
	}

	if ((r != 200) && (r != 503))
		return r;

	if (rawData.getLatestPos() > streamPos)
		rawData.init();

	AtomStream atom(*sock);

	String agent;

	Host rhost = sock->host;

	if (info.srcProtocol == ChanInfo::SP_PCP)
	{
		// don`t need PCP_CONNECT here
		Servent::handshakeOutgoingPCP(atom,rhost,remoteID,agent,sourceHost.yp|sourceHost.tracker);
	}

	return 0;

}
// -----------------------------------void PeercastSource::stream(Channel *ch){
	int numYPTries=0;
	while (ch->thread.active)	{
		ChanHitList *chl = NULL;

		ch->sourceHost.init();

		ch->setStatus(Channel::S_SEARCHING);
		LOG_CHANNEL("Channel searching for hit..");
		do 		{
			if (ch->pushSock)
			{
				ch->sock = ch->pushSock;
				ch->pushSock = NULL;
				ch->sourceHost.host = ch->sock->host;
				break;
			}
			chl = chanMgr->findHitList(ch->info);			if (chl)			{
				ChanHitSearch chs;

				// find local hit 
				chs.init();
				chs.matchHost = servMgr->serverHost;
				chs.waitDelay = MIN_RELAY_RETRY;
				chs.excludeID = servMgr->sessionID;
				if (chl->pickHits(chs))
					ch->sourceHost = chs.best[0];
				
				// else find global hit
				if (!ch->sourceHost.host.ip)
				{
					chs.init();
					chs.waitDelay = MIN_RELAY_RETRY;
					chs.excludeID = servMgr->sessionID;
					if (chl->pickHits(chs))
						ch->sourceHost = chs.best[0];
				}

				// else find local tracker
				if (!ch->sourceHost.host.ip)
				{
					chs.init();
					chs.matchHost = servMgr->serverHost;
					chs.waitDelay = MIN_TRACKER_RETRY;
					chs.excludeID = servMgr->sessionID;
					chs.trackersOnly = true;
					if (chl->pickHits(chs))
						ch->sourceHost = chs.best[0];
				}


				// else find global tracker
				if (!ch->sourceHost.host.ip)
				{
					chs.init();
					chs.waitDelay = MIN_TRACKER_RETRY;
					chs.excludeID = servMgr->sessionID;
					chs.trackersOnly = true;
					if (chl->pickHits(chs))
						ch->sourceHost = chs.best[0];
				}

			}

			// no trackers found so contact YP
			if (!ch->sourceHost.host.ip)
			{
				if (servMgr->rootHost.isEmpty())
					break;

				if (numYPTries >= 3)
					break;

				unsigned int ctime=sys->getTime();
				if ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY)
				{
					ch->sourceHost.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
					ch->sourceHost.yp = true;
					chanMgr->lastYPConnect=ctime;
				}
			}
			sys->sleepIdle();		}while((ch->sourceHost.host.ip==0) && (ch->thread.active));
		if (!ch->sourceHost.host.ip)
		{
			LOG_ERROR("Channel giving up");
			break;
		}

		if (ch->sourceHost.yp)
		{
			numYPTries++;
			LOG_CHANNEL("Channel contacting YP, try %d",numYPTries);
		}else
		{
			LOG_CHANNEL("Channel found hit");
			numYPTries=0;
		}

		if (ch->sourceHost.host.ip)
		{
			bool isTrusted = ch->sourceHost.tracker | ch->sourceHost.yp;


			//if (ch->sourceHost.tracker)
			//	peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Contacting tracker, please wait...");
			
			char ipstr[64];
			ch->sourceHost.host.toStr(ipstr);

			char *type = "";
			if (ch->sourceHost.tracker)
				type = "(tracker)";
			else if (ch->sourceHost.yp)
				type = "(YP)";

			int error=-1;			try
			{
				ch->setStatus(Channel::S_CONNECTING);			

				if (!ch->sock)
				{
					LOG_CHANNEL("Channel connecting to %s %s",ipstr,type);
					ch->connectFetch();
				}

				error = ch->handshakeFetch();
				if (error)
					throw StreamException("Handshake error");

				ch->sourceStream = ch->createSource();

				error = ch->readStream(*ch->sock,ch->sourceStream);
				if (error)
					throw StreamException("Stream error");

				error = 0;		// no errors, closing normally.
				ch->setStatus(Channel::S_CLOSING);			

				LOG_CHANNEL("Channel closed normally");

			}catch(StreamException &e)
			{
				ch->setStatus(Channel::S_ERROR);			
				LOG_ERROR("Channel to %s %s : %s",ipstr,type,e.msg);
				if (!ch->sourceHost.tracker || ((error != 503) && ch->sourceHost.tracker))
					chanMgr->deadHit(ch->sourceHost);
			}


			// broadcast quit to any connected downstream servents
			{
				ChanPacket pack;
				MemoryStream mem(pack.data,sizeof(pack.data));
				AtomStream atom(mem);
				atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_OFFAIR);
				pack.len = mem.pos;
				pack.type = ChanPacket::T_PCP;
				GnuID noID;
				noID.clear();
				servMgr->broadcastPacket(pack,ch->info.id,ch->remoteID,noID,Servent::T_RELAY);
			}


			if (ch->sourceStream)
			{
				try
				{
					if (!error)
					{
						ch->sourceStream->updateStatus(ch);
						ch->sourceStream->flush(*ch->sock);
					}
				}catch(StreamException &)
				{}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -