📄 channel.cpp
字号:
// ------------------------------------------------// File : channel.cpp// Date: 4-apr-2002// Author: giles// Desc: // Channel streaming classes. These do the actual // streaming of media between clients. //// (c) 2002 peercast.org// // ------------------------------------------------// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.// ------------------------------------------------#include <string.h>#include <stdlib.h>
#include "common.h"#include "socket.h"#include "channel.h"#include "gnutella.h"#include "servent.h"#include "servmgr.h"#include "sys.h"#include "xml.h"#include "http.h"#include "peercast.h"#include "atom.h"
#include "pcp.h"
#include "mp3.h"#include "ogg.h"#include "mms.h"#include "nsv.h"
#include "icy.h"#include "url.h"
#include "version2.h"
// -----------------------------------char *Channel::srcTypes[]={ "NONE", "PEERCAST", "SHOUTCAST", "ICECAST", "URL"};// -----------------------------------char *Channel::statusMsgs[]={ "NONE", "WAIT", "CONNECT", "REQUEST", "CLOSE", "RECEIVE", "BROADCAST", "ABORT", "SEARCH", "NOHOSTS", "IDLE", "ERROR",
"NOTFOUND"};// -----------------------------------void readXMLString(String &str, XML::Node *n, const char *arg){ char *p; p = n->findAttr(arg); if (p) { str.set(p,String::T_HTML); str.convertTo(String::T_ASCII); }}// -----------------------------------------------------------------------------// Initialise the channel to its default settings of unallocated and reset.// -----------------------------------------------------------------------------Channel::Channel(){
next = NULL; reset();}// -----------------------------------------------------------------------------void Channel::endThread(){
if (pushSock)
{
pushSock->close();
delete pushSock;
pushSock = NULL;
}
if (sock)
{
sock->close();
sock = NULL;
}
if (sourceData)
{
sourceData;
sourceData = NULL;
}
reset();
chanMgr->deleteChannel(this);
sys->endThread(&thread);
}// -----------------------------------------------------------------------------void Channel::resetPlayTime(){ info.lastPlayStart = sys->getTime();}// -----------------------------------------------------------------------------void Channel::setStatus(STATUS s){
if (s != status)
{
bool wasPlaying = isPlaying();
status = s;
if (isPlaying())
{ info.status = ChanInfo::S_PLAY; resetPlayTime();
}else
{
if (wasPlaying)
info.lastPlayEnd = sys->getTime(); info.status = ChanInfo::S_UNKNOWN;
}
if (isBroadcasting())
{
ChanHitList *chl = chanMgr->findHitListByID(info.id);
if (!chl)
chanMgr->addHitList(info);
}
peercastApp->channelUpdate(&info);
}
} // -----------------------------------------------------------------------------// Reset channel and make it available // -----------------------------------------------------------------------------void Channel::reset(){ sourceHost.init();
remoteID.clear();
streamIndex = 0;
lastIdleTime = 0; info.init(); mount.clear(); bump = false; stayConnected = false; icyMetaInterval = 0; streamPos = 0;
insertMeta.init();
headPack.init();
sourceStream = NULL;
rawData.init();
rawData.accept = ChanPacket::T_HEAD|ChanPacket::T_DATA;
setStatus(S_NONE); type = T_NONE; readDelay = false; sock = NULL;
pushSock = NULL; sourceURL.clear(); sourceData = NULL;
lastTrackerUpdate = 0;
lastMetaUpdate = 0;
srcType = SRC_NONE;
startTime = 0;
syncTime = 0;
}
// -----------------------------------
void Channel::newPacket(ChanPacket &pack)
{
if (pack.type != ChanPacket::T_PCP)
{
rawData.writePacket(pack,true);
}
}
// -----------------------------------bool Channel::checkIdle(){ return ( (info.getUptime() > chanMgr->prefetchTime) && (localListeners() == 0) && (!stayConnected) && (status != S_BROADCASTING));
}// -----------------------------------bool Channel::isFull(){ return chanMgr->maxRelaysPerChannel ? localRelays() >= chanMgr->maxRelaysPerChannel : false;}
// -----------------------------------
int Channel::localRelays()
{
return servMgr->numStreams(info.id,Servent::T_RELAY,true);
}
// -----------------------------------
int Channel::localListeners()
{
return servMgr->numStreams(info.id,Servent::T_DIRECT,true);
}
// -----------------------------------
int Channel::totalRelays()
{
int tot = 0;
ChanHitList *chl = chanMgr->findHitListByID(info.id);
if (chl)
tot += chl->numHits();
return tot;
}
// -----------------------------------
int Channel::totalListeners()
{
int tot = localListeners();
ChanHitList *chl = chanMgr->findHitListByID(info.id);
if (chl)
tot += chl->numListeners();
return tot;
}
// -----------------------------------void Channel::startGet(){ srcType = SRC_PEERCAST;
type = T_RELAY; info.srcProtocol = ChanInfo::SP_PCP; sourceData = new PeercastSource(); startStream();}// -----------------------------------void Channel::startURL(const char *u){ sourceURL.set(u); srcType = SRC_URL; type = T_BROADCAST; stayConnected = true; resetPlayTime(); sourceData = new URLSource(u); startStream();}// -----------------------------------void Channel::startStream(){ thread.data = this; thread.func = stream; if (!sys->startThread(&thread)) reset();}// -----------------------------------
void Channel::sleepUntil(double time)
{
double sleepTime = time - (sys->getDTime()-startTime);
// LOG("sleep %g",sleepTime);
if (sleepTime > 0)
{
if (sleepTime > 60) sleepTime = 60;
double sleepMS = sleepTime*1000;
sys->sleep((int)sleepMS);
}
}
// -----------------------------------void Channel::checkReadDelay(unsigned int len){ if (readDelay) {
unsigned int time = (len*1000)/((info.bitrate*1024)/8);
sys->sleep(time);
}
}// -----------------------------------THREAD_PROC Channel::stream(ThreadInfo *thread){
// thread->lock(); Channel *ch = (Channel *)thread->data;
while (thread->active && !peercastInst->isQuitting)
{
LOG_CHANNEL("Channel started");
ChanHitList *chl = chanMgr->findHitList(ch->info);
if (!chl)
chanMgr->addHitList(ch->info);
ch->sourceData->stream(ch);
LOG_CHANNEL("Channel stopped");
if (!ch->stayConnected)
{
break;
}else
{
if (!ch->info.lastPlayEnd)
ch->info.lastPlayEnd = sys->getTime();
unsigned int diff = (sys->getTime()-ch->info.lastPlayEnd) + 5;
LOG_DEBUG("Channel sleeping for %d seconds",diff);
for(unsigned int i=0; i<diff; i++)
{
if (!thread->active || peercastInst->isQuitting)
break;
sys->sleep(1000);
}
}
} ch->endThread();
return 0;}
// -----------------------------------
bool Channel::acceptGIV(ClientSocket *givSock)
{
if (!pushSock)
{
pushSock = givSock;
return true;
}else
return false;
}
// -----------------------------------
void Channel::connectFetch()
{
sock = sys->createSocket();
if (!sock)
throw StreamException("Can`t create socket");
if (sourceHost.tracker || sourceHost.yp)
{
sock->setReadTimeout(30000);
sock->setWriteTimeout(30000);
LOG_CHANNEL("Channel using longer timeouts");
}
sock->open(sourceHost.host);
sock->connect();
}
// -----------------------------------
int Channel::handshakeFetch()
{
char idStr[64];
info.id.toStr(idStr);
char sidStr[64];
servMgr->sessionID.toStr(sidStr);
sock->writeLineF("GET /channel/%s HTTP/1.0",idStr);
sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
sock->writeLineF("%s %d",PCX_HS_PCP,1);
sock->writeLine("");
HTTP http(*sock);
int r = http.readResponse();
LOG_CHANNEL("Got response: %d",r);
while (http.nextHeader())
{
char *arg = http.getArgStr();
if (!arg)
continue;
if (http.isHeader(PCX_HS_POS))
streamPos = atoi(arg);
else
Servent::readICYHeader(http, info, NULL);
LOG_CHANNEL("Channel fetch: %s",http.cmdLine);
}
if ((r != 200) && (r != 503))
return r;
if (rawData.getLatestPos() > streamPos)
rawData.init();
AtomStream atom(*sock);
String agent;
Host rhost = sock->host;
if (info.srcProtocol == ChanInfo::SP_PCP)
{
// don`t need PCP_CONNECT here
Servent::handshakeOutgoingPCP(atom,rhost,remoteID,agent,sourceHost.yp|sourceHost.tracker);
}
return 0;
}
// -----------------------------------void PeercastSource::stream(Channel *ch){
int numYPTries=0;
while (ch->thread.active) {
ChanHitList *chl = NULL;
ch->sourceHost.init();
ch->setStatus(Channel::S_SEARCHING);
LOG_CHANNEL("Channel searching for hit..");
do {
if (ch->pushSock)
{
ch->sock = ch->pushSock;
ch->pushSock = NULL;
ch->sourceHost.host = ch->sock->host;
break;
}
chl = chanMgr->findHitList(ch->info); if (chl) {
ChanHitSearch chs;
// find local hit
chs.init();
chs.matchHost = servMgr->serverHost;
chs.waitDelay = MIN_RELAY_RETRY;
chs.excludeID = servMgr->sessionID;
if (chl->pickHits(chs))
ch->sourceHost = chs.best[0];
// else find global hit
if (!ch->sourceHost.host.ip)
{
chs.init();
chs.waitDelay = MIN_RELAY_RETRY;
chs.excludeID = servMgr->sessionID;
if (chl->pickHits(chs))
ch->sourceHost = chs.best[0];
}
// else find local tracker
if (!ch->sourceHost.host.ip)
{
chs.init();
chs.matchHost = servMgr->serverHost;
chs.waitDelay = MIN_TRACKER_RETRY;
chs.excludeID = servMgr->sessionID;
chs.trackersOnly = true;
if (chl->pickHits(chs))
ch->sourceHost = chs.best[0];
}
// else find global tracker
if (!ch->sourceHost.host.ip)
{
chs.init();
chs.waitDelay = MIN_TRACKER_RETRY;
chs.excludeID = servMgr->sessionID;
chs.trackersOnly = true;
if (chl->pickHits(chs))
ch->sourceHost = chs.best[0];
}
}
// no trackers found so contact YP
if (!ch->sourceHost.host.ip)
{
if (servMgr->rootHost.isEmpty())
break;
if (numYPTries >= 3)
break;
unsigned int ctime=sys->getTime();
if ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY)
{
ch->sourceHost.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
ch->sourceHost.yp = true;
chanMgr->lastYPConnect=ctime;
}
}
sys->sleepIdle(); }while((ch->sourceHost.host.ip==0) && (ch->thread.active));
if (!ch->sourceHost.host.ip)
{
LOG_ERROR("Channel giving up");
break;
}
if (ch->sourceHost.yp)
{
numYPTries++;
LOG_CHANNEL("Channel contacting YP, try %d",numYPTries);
}else
{
LOG_CHANNEL("Channel found hit");
numYPTries=0;
}
if (ch->sourceHost.host.ip)
{
bool isTrusted = ch->sourceHost.tracker | ch->sourceHost.yp;
//if (ch->sourceHost.tracker)
// peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Contacting tracker, please wait...");
char ipstr[64];
ch->sourceHost.host.toStr(ipstr);
char *type = "";
if (ch->sourceHost.tracker)
type = "(tracker)";
else if (ch->sourceHost.yp)
type = "(YP)";
int error=-1; try
{
ch->setStatus(Channel::S_CONNECTING);
if (!ch->sock)
{
LOG_CHANNEL("Channel connecting to %s %s",ipstr,type);
ch->connectFetch();
}
error = ch->handshakeFetch();
if (error)
throw StreamException("Handshake error");
ch->sourceStream = ch->createSource();
error = ch->readStream(*ch->sock,ch->sourceStream);
if (error)
throw StreamException("Stream error");
error = 0; // no errors, closing normally.
ch->setStatus(Channel::S_CLOSING);
LOG_CHANNEL("Channel closed normally");
}catch(StreamException &e)
{
ch->setStatus(Channel::S_ERROR);
LOG_ERROR("Channel to %s %s : %s",ipstr,type,e.msg);
if (!ch->sourceHost.tracker || ((error != 503) && ch->sourceHost.tracker))
chanMgr->deadHit(ch->sourceHost);
}
// broadcast quit to any connected downstream servents
{
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream atom(mem);
atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_OFFAIR);
pack.len = mem.pos;
pack.type = ChanPacket::T_PCP;
GnuID noID;
noID.clear();
servMgr->broadcastPacket(pack,ch->info.id,ch->remoteID,noID,Servent::T_RELAY);
}
if (ch->sourceStream)
{
try
{
if (!error)
{
ch->sourceStream->updateStatus(ch);
ch->sourceStream->flush(*ch->sock);
}
}catch(StreamException &)
{}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -