📄 channel.cpp.svn-base
字号:
// ------------------------------------------------// File : channel.cpp// Date: 4-apr-2002// Author: giles// Desc: // Channel streaming classes. These do the actual // streaming of media between clients. //// (c) 2002 peercast.org// // ------------------------------------------------// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.// ------------------------------------------------#include <string.h>#include <stdlib.h>#include "common.h"#include "socket.h"#include "channel.h"#include "gnutella.h"#include "servent.h"#include "servmgr.h"#include "sys.h"#include "xml.h"#include "http.h"#include "peercast.h"#include "atom.h"
#include "pcp.h"
#include "mp3.h"#include "ogg.h"#include "mms.h"#include "icy.h"#include "url.h"// -----------------------------------char *Channel::srcTypes[]={ "NONE", "PEERCAST", "SHOUTCAST", "ICECAST", "URL"};// -----------------------------------char *Channel::statusMsgs[]={ "NONE", "WAIT", "CONNECT", "REQUEST", "CLOSE", "RECEIVE", "BROADCAST", "ABORT", "SEARCH", "NOHOSTS", "IDLE", "ERROR",
"NOTFOUND"};// -----------------------------------void readXMLString(String &str, XML::Node *n, const char *arg){ char *p; p = n->findAttr(arg); if (p) { str.set(p,String::T_HTML); str.convertTo(String::T_ASCII); }}// -----------------------------------------------------------------------------// Initialise the channel to its default settings of unallocated and reset.// -----------------------------------------------------------------------------void Channel::init(){ reset();}// -----------------------------------------------------------------------------void Channel::close(){ thread.active = false;}// -----------------------------------------------------------------------------void Channel::endThread(){
if (pushSock)
{
pushSock->close();
delete pushSock;
pushSock = NULL;
}
close(); thread.unlock(); init();}// -----------------------------------------------------------------------------void Channel::resetPlayTime(){ info.lastPlayTime = sys->getTime();}// -----------------------------------------------------------------------------void Channel::setStatus(STATUS s){
if (s != status)
{
status = s;
if (isPlaying())
{ info.status = ChanInfo::S_PLAY; resetPlayTime();
}else info.status = ChanInfo::S_UNKNOWN;
if (isBroadcasting())
{
ChanHitList *chl = chanMgr->findHitListByID(info.id);
if (!chl)
chanMgr->addHitList(info);
}
}
} // -----------------------------------------------------------------------------// Reset channel and make it available // -----------------------------------------------------------------------------void Channel::reset(){ sourceHost.init();
remoteID.clear();
streamIndex = 0;
lastIdleTime = 0; info.init(); index = 0; mount.clear(); bump = false; stayConnected = false; icyMetaInterval = 0; streamPos = 0;
insertMeta.init();
headPack.init();
sourceStream = NULL;
rawData.init();
rawData.accept = ChanPacket::T_HEAD|ChanPacket::T_DATA;
setStatus(S_NONE); type = T_NONE; readDelay = false; sock = NULL;
pushSock = NULL; sourceURL.clear(); sourceData = NULL;
lastTrackerUpdate = 0;
lastMetaUpdate = 0;
srcType = SRC_NONE;
}
// -----------------------------------
void Channel::newPacket(ChanPacket &pack)
{
if (pack.type != ChanPacket::T_PCP)
{
rawData.writePacket(pack,true);
}
}
// -----------------------------------bool Channel::checkIdle(){ return ( (info.getUptime() > chanMgr->prefetchTime) && (numListeners() == 0) && (!stayConnected) && (status != S_BROADCASTING));
}// -----------------------------------bool Channel::isFull(){ return chanMgr->maxStreamsPerChannel ? numRelays() >= chanMgr->maxStreamsPerChannel : false;}
// -----------------------------------
int Channel::numRelays()
{
return servMgr->numStreams(info.id,ChanInfo::SP_PCP)
+servMgr->numStreams(info.id,ChanInfo::SP_PEERCAST);
}
// -----------------------------------
int Channel::numListeners()
{
return servMgr->numStreams(info.id,ChanInfo::SP_HTTP)
+servMgr->numStreams(info.id,ChanInfo::SP_MMS);
}
// -----------------------------------void Channel::startGet(){ srcType = SRC_PEERCAST;
type = T_RELAY; info.srcProtocol = ChanInfo::SP_PCP; sourceData = new PeercastSource(); startStream();}// -----------------------------------void Channel::startURL(const char *u){ sourceURL.set(u); srcType = SRC_URL; type = T_BROADCAST; stayConnected = true; resetPlayTime(); sourceData = new URLSource(u); startStream();}// -----------------------------------void Channel::startStream(){ thread.data = this; thread.func = stream; if (!sys->startThread(&thread)) init();}// -----------------------------------void Channel::checkReadDelay(unsigned int len){ if (readDelay) { unsigned int time = (len*1000)/((info.bitrate*1024)/8);// LOG_CHANNEL("sleeping for %d\n",time); sys->sleep(time); }}// -----------------------------------int Channel::stream(ThreadInfo *thread){
thread->lock(); Channel *ch = (Channel *)thread->data;
while (thread->active && !peercastInst->isQuitting)
{
LOG_CHANNEL("Ch.%d started",ch->index);
ChanHitList *chl = chanMgr->findHitList(ch->info);
if (!chl)
chanMgr->addHitList(ch->info);
ch->sourceData->stream(ch);
LOG_CHANNEL("Ch.%d stopped",ch->index);
if (!ch->stayConnected)
break;
sys->sleepIdle();
} ch->endThread(); return 0;}
// -----------------------------------
bool Channel::acceptGIV(ClientSocket *givSock)
{
if (!pushSock)
{
pushSock = givSock;
return true;
}else
return false;
}
// -----------------------------------
void Channel::connectFetch()
{
sock = sys->createSocket();
if (!sock)
throw StreamException("Can`t create socket");
if (sourceHost.tracker || sourceHost.yp)
{
sock->setReadTimeout(30000);
sock->setWriteTimeout(30000);
LOG_CHANNEL("Ch.%d using longer timeouts",index);
}
sock->open(sourceHost.host);
sock->connect();
}
// -----------------------------------
int Channel::handshakeFetch()
{
char idStr[64];
info.id.toStr(idStr);
char sidStr[64];
servMgr->sessionID.toStr(sidStr);
bool isTrusted = sourceHost.tracker | sourceHost.yp;
sock->writeLine("GET /channel/%s HTTP/1.0",idStr);
sock->writeLine("%s %d",PCX_HS_POS,streamPos);
sock->writeLine("%s %d",PCX_HS_PCP,1);
sock->writeLine("");
HTTP http(*sock);
int r = http.readResponse();
while (http.nextHeader())
{
char *arg = http.getArgStr();
if (!arg)
continue;
if (http.isHeader(PCX_HS_POS))
streamPos = atoi(arg);
else
Servent::readICYHeader(http, info, NULL,isTrusted);
LOG_CHANNEL("Channel fetch: %s",http.cmdLine);
}
if ((r != 200) && (r != 503))
return r;
String agent;
AtomStream atom(*sock);
Host rhost = sock->host;
if (info.srcProtocol == ChanInfo::SP_PCP)
{
// don`t need PCP_CONNECT here
Servent::handshakeOutgoingPCP(atom,rhost,remoteID,agent);
}
return 0;
}
// -----------------------------------void PeercastSource::stream(Channel *ch){
while (ch->thread.active) {
ChanHitList *chl = NULL;
ch->sourceHost.init();
LOG_CHANNEL("Ch.%d searching for hit..",ch->index); do {
if (ch->pushSock)
{
ch->sock = ch->pushSock;
ch->pushSock = NULL;
ch->sourceHost.host = ch->sock->host;
break;
}
chl = chanMgr->findHitList(ch->info); if (chl) {
// find local hit ch->sourceHost = chl->getHit(&servMgr->serverHost,MIN_RELAY_RETRY,false,false);
// else find global hit
if (!ch->sourceHost.host.ip)
ch->sourceHost = chl->getHit(NULL,MIN_RELAY_RETRY,false,false);
// else find local tracker
if (!ch->sourceHost.host.ip)
ch->sourceHost = chl->getHit(&servMgr->serverHost,MIN_TRACKER_RETRY,false,true);
// else find global tracker
if (!ch->sourceHost.host.ip)
ch->sourceHost = chl->getHit(NULL,MIN_TRACKER_RETRY,false,true); // should be about 60
}
// no trackers found so contact YP
if (!ch->sourceHost.host.ip)
{
if (!servMgr->rootHost.isEmpty())
{
unsigned int ctime=sys->getTime();
if ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY)
{
ch->sourceHost.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
ch->sourceHost.yp = true;
chanMgr->lastYPConnect=ctime;
}
}
}
sys->sleepIdle(); }while((ch->sourceHost.host.ip==0) && (ch->thread.active));
LOG_CHANNEL("Ch.%d found hit",ch->index);
if (ch->sourceHost.host.ip)
{
bool isTrusted = ch->sourceHost.tracker | ch->sourceHost.yp;
//if (ch->sourceHost.tracker)
// peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Contacting tracker, please wait...");
char ipstr[64];
ch->sourceHost.host.toStr(ipstr);
char *type = "";
if (ch->sourceHost.tracker)
type = "(tracker)";
else if (ch->sourceHost.yp)
type = "(YP)";
int error=0; try
{
ch->setStatus(Channel::S_CONNECTING);
if (!ch->sock)
{
LOG_CHANNEL("Ch.%d connecting to %s %s",ch->index,ipstr,type);
ch->connectFetch();
}
error = ch->handshakeFetch();
if (!error)
{
ch->sourceStream = ch->createSource();
ch->readStream(*ch->sock,ch->sourceStream);
ch->setStatus(Channel::S_CLOSING);
}
}catch(StreamException &e)
{
LOG_ERROR("Ch.%d to %s %s : %s",ch->index,ipstr,type,e.msg);
ch->setStatus(Channel::S_ERROR);
chanMgr->deadHit(ch->info.id,ch->sourceHost);
if (ch->sourceStream)
error = ch->sourceStream->error;
}
// broadcast quit to any connected downstream servents
{
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream atom(mem);
atom.writeInt(PCP_QUIT,0);
pack.len = mem.pos;
pack.type = ChanPacket::T_PCP;
GnuID noID;
noID.clear();
servMgr->broadcastPacket(pack,ch->info.id,ch->remoteID,noID,Servent::T_STREAM);
}
if (ch->sourceStream)
{
try
{
if (!error)
{
ch->sourceStream->updateStatus(ch);
ch->sourceStream->flush(*ch->sock);
}
}catch(StreamException &)
{}
ChannelStream *cs = ch->sourceStream;
ch->sourceStream = NULL;
cs->kill();
delete cs;
}
if (ch->sock) { ch->sock->close(); delete ch->sock; ch->sock = NULL; }
if (error == 404)
{
LOG_ERROR("Ch.%d not found",ch->index);
return;
}
}
ch->lastIdleTime = sys->getTime();
ch->setStatus(Channel::S_IDLE);
while ((ch->checkIdle()) && (ch->thread.active))
sys->sleepIdle();
sys->sleepIdle(); }}// -----------------------------------void Channel::startICY(ClientSocket *cs, SRC_TYPE st){ srcType = st; type = T_BROADCAST; cs->setReadTimeout(30000); sock = cs; info.srcProtocol = ChanInfo::SP_HTTP;
streamIndex = ++chanMgr->icyIndex; sourceData = new ICYSource(); startStream();}// -----------------------------------static char *nextMetaPart(char *str,char delim){ while (*str) { if (*str == delim) { *str++ = 0; return str; } str++; } return NULL;}// -----------------------------------static void copyStr(char *to,char *from,int max){ char c; while ((c=*from++) && (--max)) if (c != '\'') *to++ = c; *to = 0;}// -----------------------------------void Channel::processMp3Metadata(char *str){ char *cmd=str; while (cmd) { char *arg = nextMetaPart(cmd,'='); if (!arg) break; char *next = nextMetaPart(arg,';'); if (strcmp(cmd,"StreamTitle")==0) info.track.title.setUnquote(arg,String::T_ASCII); else if (strcmp(cmd,"StreamUrl")==0) info.track.contact.setUnquote(arg,String::T_ASCII); cmd = next; } updateMeta();}// -----------------------------------XML::Node *ChanHit::createXML(){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -