📄 channel.cpp
字号:
// ------------------------------------------------// File : channel.cpp// Date: 4-apr-2002// Author: giles// Desc: // Channel streaming classes. These do the actual // streaming of media between clients. //// (c) 2002 peercast.org// // ------------------------------------------------// This program is free software; you can redistribute it and/or modify// it under the terms of the GNU General Public License as published by// the Free Software Foundation; either version 2 of the License, or// (at your option) any later version.// This program is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU General Public License for more details.// ------------------------------------------------#include <string.h>#include <stdlib.h>#include "common.h"#include "socket.h"#include "channel.h"#include "gnutella.h"#include "servent.h"#include "servmgr.h"#include "sys.h"#include "xml.h"#include "http.h"#include "peercast.h"// -----------------------------------char *Channel::srcTypes[]={ "None", "Peercast", "SHOUTcast", "Icecast", "URL"};// -----------------------------------char *Channel::statusMsgs[]={ "NONE", "WAIT", "CONNECT", "REQUEST", "CLOSE", "RECEIVE", "BROADCAST", "ABORT", "SEARCH", "NOHOSTS", "IDLE", "ERROR"};// -----------------------------------void readXMLString(String &str, XML::Node *n, const char *arg){ char *p; p = n->findAttr(arg); if (p) { str.set(p,String::T_HTML); str.convertTo(String::T_ASCII); }}// -----------------------------------------------------------------------------// Initialise the channel to its default settings of unallocated and reset.// -----------------------------------------------------------------------------void Channel::init(){ reset();}// -----------------------------------------------------------------------------// Close this channel and stop thread// -----------------------------------------------------------------------------void Channel::close(){ thread.active = false; setStatus(S_CLOSING);}// -----------------------------------------------------------------------------void Channel::endThread(){ close(); thread.unlock(); init();}// -----------------------------------------------------------------------------void Channel::setStatus(STATUS s){ status = s; if (isPlaying()) { info.status = ChanInfo::S_PLAY; info.lastPlay = sys->getTime(); }else{ info.status = ChanInfo::S_UNKNOWN; info.lastPlay = 0; }} // -----------------------------------------------------------------------------// Reset channel and make it available // -----------------------------------------------------------------------------void Channel::reset(){ currSource.init(); srcType = SRC_NONE; lastIdleTime = 0; prefetchCnt=0; numRelays = 0; numListeners = 0; info.init(); index = 0; mount.clear(); bump = false; stayConnected = false; icyMetaInterval = 0; syncPos = 0; headMeta.init(); insertMeta.init(); chanData.init(); setStatus(S_NONE); type = T_NONE; readDelay = 0; sock = NULL; sourceURL.clear();}// -----------------------------------bool Channel::checkIdle(){ if ((numListeners > 0) || (stayConnected || (status == S_BROADCASTING))) { prefetchCnt=0; return false; } if (prefetchCnt) prefetchCnt--; if (prefetchCnt) LOG_CHANNEL("prefetch %d",prefetchCnt); return prefetchCnt==0;}// -----------------------------------bool Channel::isFull(){ return chanMgr->maxStreamsPerChannel ? numRelays >= chanMgr->maxStreamsPerChannel : false;}// -----------------------------------void Channel::startMP3File(char *fn){ type = T_BROADCAST; FileStream *fs = new FileStream(); fs->openReadOnly(fn); input = fs; thread.data = this; thread.func = streamMP3File; if (!sys->startThread(&thread)) init();}// -----------------------------------int Channel::streamMP3File(ThreadInfo *thread){ thread->lock(); Channel *ch = (Channel *)thread->data; LOG_CHANNEL("Channel started: %s",ch->getName()); try { while (thread->active) { ch->input->read(&ch->mp3Head,sizeof(MP3Header)); ch->readMP3(); ch->input->rewind(); LOG_CHANNEL("%s end",ch->getName()); } }catch(StreamException &e) { LOG_ERROR("Unable to read file: %s",e.msg); } ch->input->close(); delete ch->input; LOG_CHANNEL("Channel stopped: %s",ch->getName()); ch->endThread(); return 0;}// -----------------------------------void Channel::startGet(){ srcType = SRC_PEERCAST; type = T_RELAY; input = NULL; info.srcType = ChanInfo::T_PEERCAST; thread.data = this; thread.func = streamGet; if (!sys->startThread(&thread)) init();}// -----------------------------------void Channel::startURL(const char *u){ sourceURL.set(u); srcType = SRC_URL; type = T_BROADCAST; stayConnected = true; // source type should be set before here. //info.srcType = ChanInfo::T_UNKNOWN; thread.data = this; thread.func = streamURL; if (!sys->startThread(&thread)) init();}// -----------------------------------int Channel::findProc(ThreadInfo *thread){ thread->lock(); Channel *ch = (Channel *)thread->data; ch->setStatus(S_SEARCHING); int findCnt=0; while (thread->active) { ChanHitList *chl = chanMgr->findHitListByID(ch->info.id); if (chl && chl->numHits()) { // update chaninfo with latest ch->info = chl->info; ch->setStatus(S_IDLE); thread->unlock(); ch->startGet(); return 0; }else { if ((findCnt%60) == 0) servMgr->findChannel(ch->info); if (findCnt++ > 300) // give up eventually break; } sys->sleep(1000); } ch->endThread(); return 0;}// -----------------------------------String Channel::streamURL(const char *url){ String nextURL; String urlTmp; urlTmp.set(url); char *fileName = urlTmp.cstr(); Stream *file = NULL; PlayList *pls=NULL; LOG_CHANNEL("Fetch URL=%s",fileName); try { if (strnicmp(fileName,"http://",7)==0) { fileName+=7; ClientSocket *sock = sys->createSocket(); if (!sock) throw StreamException("Ch.%d cannot create socket",index); file = sock; char *dir = strstr(fileName,"/"); if (dir) *dir++=0; LOG_CHANNEL("Fetch Host=%s",fileName); if (dir) LOG_CHANNEL("Fetch Dir=%s",dir); setStatus(S_CONNECTING); Host host; host.fromStr(fileName,80); sock->open(host); sock->connect(); HTTP http(*sock); http.writeLine("GET /%s HTTP/1.1",dir?dir:""); http.writeLine("%s %s",HTTP_HS_HOST,fileName); http.writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT); // "WinampMPEG/2.8"; http.writeLine("%s %s",HTTP_HS_CONNECTION,"close"); http.writeLine("%s %s",HTTP_HS_ACCEPT,"*/*"); http.writeLine("icy-metadata:1"); http.writeLine(""); int res = http.readResponse(); if ((res!=200) && (res!=302)) { LOG_ERROR("HTTP response: %s",http.cmdLine); throw StreamException("Bad HTTP connect"); } String name = info.name; while (http.nextHeader()) { LOG_CHANNEL("Fetch HTTP: %s",http.cmdLine); ChanInfo tmpInfo = info; Servent::readICYHeader(http,info,NULL); if (!tmpInfo.name.isEmpty()) info.name = tmpInfo.name; if (!tmpInfo.genre.isEmpty()) info.genre = tmpInfo.genre; if (!tmpInfo.url.isEmpty()) info.url = tmpInfo.url; if (http.isHeader("icy-metaint")) icyMetaInterval = http.getArgInt(); else if (http.isHeader("Location:")) nextURL.set(http.getArgStr()); char *arg = http.getArgStr(); if (arg) { if (http.isHeader("content-type")) { if (stristr(arg,MIME_XSCPLS)) pls = new PlayList(PlayList::T_SCPLS, 1000); else if (stristr(arg,MIME_PLS)) pls = new PlayList(PlayList::T_PLS, 1000); else if (stristr(arg,MIME_XPLS)) pls = new PlayList(PlayList::T_PLS, 1000); else if (stristr(arg,MIME_M3U)) pls = new PlayList(PlayList::T_PLS, 1000); else if (stristr(arg,MIME_TEXT)) pls = new PlayList(PlayList::T_PLS, 1000); } } } if ((!nextURL.isEmpty()) && (res==302)) { LOG_CHANNEL("Ch.%d redirect: %s",index,nextURL.cstr()); sock->close(); delete sock; sock = NULL; return nextURL; } //}else if (strnicmp(fileName,"file://",7)==0) }else{ if (strnicmp(fileName,"file://",7)==0) fileName+=7; FileStream *fs = new FileStream(); fs->openReadOnly(fileName); file = fs; // if filetype is unknown, try and figure it out from file extension. //if ((info.srcType == ChanInfo::T_UNKNOWN) || (info.srcType == ChanInfo::T_PLAYLIST)) { const char *ext = fileName+strlen(fileName); while (*--ext) if (*ext=='.') { ext++; break; } ChanInfo::TYPE type = ChanInfo::getTypeFromStr(ext); if (type != ChanInfo::T_UNKNOWN) info.srcType = type; } if (info.bitrate) readDelay = (ChanPacket::MAX_DATALEN*1000)/((info.bitrate*1024)/8); else readDelay = 1; if (info.srcType == ChanInfo::T_PLAYLIST) pls = new PlayList(PlayList::T_PLS, 1000);// }else // {// thread.active = false; // fatal error// throw StreamException("Bad URL"); } if (pls) { LOG_CHANNEL("Ch.%d is PLS",index); try { char tmp[256]; while (file->readLine(tmp,sizeof(tmp))) { //LOG_CHANNEL("PLS: %s",tmp); switch (pls->type) { case PlayList::T_PLS: if (tmp[0] != '#') pls->addURL(tmp,""); break; case PlayList::T_SCPLS: if (strnicmp(tmp,"file",4)==0) { char *p = strstr(tmp,"="); if (p) pls->addURL(p+1,""); } break; } } }catch(StreamException &) {} file->close(); delete file; file = NULL; int urlNum=0; String url; LOG_CHANNEL("Playlist: %d URLs",pls->numURLs); while ((thread.active) && (pls->numURLs)) { if (url.isEmpty()) { url = pls->urls[urlNum%pls->numURLs]; urlNum++; } try { url = streamURL(url.cstr()); }catch(StreamException &) {} } delete pls; }else { // if we didn`t get a channel id from the source, then create our own (its an original broadcast) if (!info.id.isSet()) { info.id = chanMgr->broadcastID; info.id.encode(&servMgr->serverHost,info.name.cstr(),NULL,info.bitrate); } info.contentType = info.srcType; input = file; setStatus(S_BROADCASTING); readStream(); file->close(); } }catch(StreamException &e) { setStatus(S_ERROR); LOG_ERROR("Ch.%d error: %s",index,e.msg); sys->sleep(1000); } setStatus(S_CLOSING); if (file) { file->close(); delete file; } return nextURL;}// -----------------------------------int Channel::streamURL(ThreadInfo *thread){ thread->lock(); Channel *ch = (Channel *)thread->data; ClientSocket *sock = NULL; LOG_CHANNEL("Ch.%d started: %s",ch->index,ch->sourceURL.cstr()); String url; while (thread->active) { if (url.isEmpty()) url = ch->sourceURL; url = ch->streamURL(url.cstr()); } ch->endThread(); return 0;} // -----------------------------------int Channel::streamGet(ThreadInfo *thread){ thread->lock(); GnuPacket pack; Channel *ch = (Channel *)thread->data; chanMgr->lockHitList(ch->info.id,true); LOG_CHANNEL("Ch.%d started: %s",ch->index,ch->getName()); while (thread->active) { ch->lastIdleTime = sys->getTime(); ch->setStatus(S_IDLE); while ((ch->checkIdle()) && (thread->active)) sys->sleepIdle(); if (!thread->active) break; //ch->info.title.set("Please wait.",String::T_ASCII); bool doneSearch=false; do { Host sh = servMgr->serverHost; ChanHitList *chl = chanMgr->findHitListByID(ch->info.id); if (chl) { if (servMgr->getFirewall() == ServMgr::FW_OFF) { // we are non-firewalled so try non push hosts, then push hosts ch->currSource = chl->getHit(false); if (!ch->currSource.host.ip) ch->currSource = chl->getHit(true); }else{ // we are firewalled so try non push hosts only ch->currSource = chl->getHit(false); } } if (sh.isSame(ch->currSource.host)) ch->currSource.host.ip = 0; if (!ch->currSource.host.ip) { ch->setStatus(S_SEARCHING); if (!doneSearch) { LOG_CHANNEL("Ch.%d search..",ch->index); if (servMgr->findChannel(ch->info)) doneSearch = true; } sys->sleepIdle(); } }while((ch->currSource.host.ip==0) && (thread->active)); // totally give up if (!ch->currSource.host.ip) break; { try { char hostName[64]; ch->currSource.host.IPtoStr(hostName); if (!ch->currSource.firewalled || (servMgr->serverHost.localIP() && ch->currSource.host.localIP())) { ClientSocket *s = sys->createSocket(); if (!s) throw StreamException("Ch.%d cannot create socket",ch->index); ch->sock = s; ch->setStatus(S_CONNECTING); ch->sock->open(ch->currSource.host); ch->sock->timeout = 10000; ch->sock->connect(); LOG_CHANNEL("Ch.%d connect to %s",ch->index,hostName); }else{ Host sh = servMgr->serverHost; if (!sh.isValid() || sh.loopbackIP()) throw StreamException("No Server, unable to ask for push."); ch->setStatus(S_REQUESTING); sys->sleep(500); // wait a bit for the previous find to go int timeout; LOG_CHANNEL("Ch.%d Push request",ch->index); ch->pushSock = NULL; ch->pushIndex = ch->currSource.index; for(int i=0; i<chanMgr->pushTries; i++) { LOG_NETWORK("Push-request try %d",i+1); pack.initPush(ch->currSource,sh); servMgr->route(pack,ch->currSource.packetID,NULL); timeout = chanMgr->pushTimeout; while ((!ch->pushSock) && (thread->active)) { if (ch->checkBump()) throw StreamException("Bumped"); if (timeout-- <= 0) break; sys->sleep(1000); } if (ch->pushSock || (!thread->active)) break; } if (!ch->pushSock) throw StreamException("Push timeout"); ch->setStatus(S_CONNECTING); ch->sock = ch->pushSock; } char idStr[64]; ch->info.id.toStr(idStr); if (ch->info.srcType != ChanInfo::T_PEERCAST) { // raw data stream
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -