📄 channel.cpp.svn-base
字号:
ch->sock = NULL; }
if (error == 404)
{
LOG_ERROR("Ch.%d not found",ch->index);
return;
}
}
ch->lastIdleTime = sys->getTime();
ch->setStatus(Channel::S_IDLE);
while ((ch->checkIdle()) && (ch->thread.active))
sys->sleepIdle();
sys->sleepIdle(); }}// -----------------------------------void Channel::startICY(ClientSocket *cs, SRC_TYPE st){ srcType = st; type = T_BROADCAST; cs->setReadTimeout(0); // stay connected even when theres no data coming through sock = cs; info.srcProtocol = ChanInfo::SP_HTTP;
streamIndex = ++chanMgr->icyIndex; sourceData = new ICYSource(); startStream();}// -----------------------------------static char *nextMetaPart(char *str,char delim){ while (*str) { if (*str == delim) { *str++ = 0; return str; } str++; } return NULL;}// -----------------------------------static void copyStr(char *to,char *from,int max){ char c; while ((c=*from++) && (--max)) if (c != '\'') *to++ = c; *to = 0;}// -----------------------------------void Channel::processMp3Metadata(char *str){ ChanInfo newInfo = info;
char *cmd=str; while (cmd) { char *arg = nextMetaPart(cmd,'='); if (!arg) break; char *next = nextMetaPart(arg,';'); if (strcmp(cmd,"StreamTitle")==0)
{ newInfo.track.title.setUnquote(arg,String::T_ASCII);
newInfo.track.title.convertTo(String::T_UNICODE);
}else if (strcmp(cmd,"StreamUrl")==0)
{ newInfo.track.contact.setUnquote(arg,String::T_ASCII);
newInfo.track.contact.convertTo(String::T_UNICODE);
} cmd = next; } updateInfo(newInfo);}// -----------------------------------XML::Node *ChanHit::createXML(){ // IP char ipStr[64]; host.toStr(ipStr); return new XML::Node("host ip=\"%s\" hops=\"%d\" listeners=\"%d\" relays=\"%d\" uptime=\"%d\" push=\"%d\" relay=\"%d\" direct=\"%d\" cin=\"%d\" stable=\"%d\" agent=\"%s\" version=\"%d\" update=\"%d\" tracker=\"%d\"", ipStr, numHops, numListeners, numRelays,
upTime, firewalled?1:0, relay?1:0, direct?1:0,
cin?1:0,
stable?1:0, agentStr,
version, sys->getTime()-time,
tracker
);}// -----------------------------------XML::Node *ChanHitList::createHitsXML(){ XML::Node *hn = new XML::Node("hits listeners=\"%d\" hosts=\"%d\" firewalled=\"%d\" closest=\"%d\" furthest=\"%d\" newest=\"%d\"", numListeners(), numHits(), numFirewalled(), closestHit(), furthestHit(), sys->getTime()-newestHit() ); for(int i=0; i<MAX_HITS; i++) if (hits[i].host.ip) hn->add(hits[i].createXML()); return hn;}// -----------------------------------XML::Node *Channel::createRelayXML(bool showStat){ const char *ststr; ststr = getStatusStr(); if (!showStat) if ((status == S_RECEIVING) || (status == S_BROADCASTING)) ststr = "OK"; ChanHitList *chl = chanMgr->findHitList(info); return new XML::Node("relay listeners=\"%d\" relays=\"%d\" hosts=\"%d\" status=\"%s\"", localListeners(), localRelays(), (chl!=NULL)?chl->numHits():0, ststr ); }// -----------------------------------void ChanMeta::fromXML(XML &xml){ MemoryStream tout(data,MAX_DATALEN); xml.write(tout); len = tout.pos;}// -----------------------------------void ChanMeta::fromMem(void *p, int l){ len = l; memcpy(data,p,len);}// -----------------------------------void ChanMeta::addMem(void *p, int l){ if ((len+l) <= MAX_DATALEN) { memcpy(data+len,p,l); len += l; }}
// -----------------------------------
void Channel::broadcastTrackerUpdate(GnuID &svID, bool force)
{
unsigned int ctime = sys->getTime();
if (((ctime-lastTrackerUpdate) > 30) || (force))
{
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack));
AtomStream atom(mem);
ChanHit hit;
ChanHitList *chl = chanMgr->findHitListByID(info.id);
if (!chl)
throw StreamException("Broadcast channel has no hitlist");
int numListeners = localListeners();
int numRelays = localRelays();
hit.initLocal(numListeners,numRelays,info.numSkips,info.getUptime(),isPlaying());
hit.tracker = true;
atom.writeParent(PCP_BCST,7);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ROOT);
atom.writeChar(PCP_BCST_HOPS,0);
atom.writeChar(PCP_BCST_TTL,7);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
atom.writeParent(PCP_CHAN,4);
atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
atom.writeBytes(PCP_CHAN_KEY,chanMgr->broadcastID.id,16);
info.writeInfoAtoms(atom);
info.writeTrackAtoms(atom);
hit.writeAtoms(atom,false,info.id);
pack.len = mem.pos;
pack.type = ChanPacket::T_PCP;
GnuID noID;
noID.clear();
int cnt = servMgr->broadcastPacket(pack,noID,servMgr->sessionID,svID,Servent::T_COUT);
if (cnt)
{
LOG_DEBUG("Sent tracker update for %s to %d client(s)",info.name.cstr(),cnt);
lastTrackerUpdate = ctime;
}
}
}
// -----------------------------------
bool Channel::sendPacketUp(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did)
{
if ( isActive()
&& (!cid.isSet() || info.id.isSame(cid))
&& (!sid.isSet() || !remoteID.isSame(sid))
&& sourceStream
)
return sourceStream->sendPacket(pack,did);
return false;
}
// -----------------------------------void Channel::updateInfo(ChanInfo &newInfo){
if (info.update(newInfo))
{
if (isBroadcasting())
{
unsigned int ctime = sys->getTime();
if ((ctime-lastMetaUpdate) > 30)
{
lastMetaUpdate = ctime;
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack));
AtomStream atom(mem);
atom.writeParent(PCP_BCST,7);
atom.writeChar(PCP_BCST_HOPS,0);
atom.writeChar(PCP_BCST_TTL,7);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_RELAYS);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
atom.writeBytes(PCP_BCST_CHANID,info.id.id,16);
atom.writeParent(PCP_CHAN,3);
atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
info.writeInfoAtoms(atom);
info.writeTrackAtoms(atom);
pack.len = mem.pos;
pack.type = ChanPacket::T_PCP;
GnuID noID;
noID.clear();
servMgr->broadcastPacket(pack,info.id,servMgr->sessionID,noID,Servent::T_RELAY);
broadcastTrackerUpdate(noID);
}
}
}
ChanHitList *chl = chanMgr->findHitList(info);
if (chl)
chl->info = info;
peercastApp->channelUpdate(&info);}// -----------------------------------ChannelStream *Channel::createSource(){// if (servMgr->relayBroadcast)// chanMgr->broadcastRelays(NULL,chanMgr->minBroadcastTTL,chanMgr->maxBroadcastTTL); ChannelStream *source=NULL; if (info.srcProtocol == ChanInfo::SP_PEERCAST) { LOG_CHANNEL("Ch.%d is Peercast",index); source = new PeercastStream(); } else if (info.srcProtocol == ChanInfo::SP_PCP)
{
LOG_CHANNEL("Ch.%d is PCP",index);
PCPStream *pcp = new PCPStream(remoteID);
source = pcp;
}
else if (info.srcProtocol == ChanInfo::SP_MMS) { LOG_CHANNEL("Ch.%d is MMS",index); source = new MMSStream(); }else { switch(info.contentType) { case ChanInfo::T_MP3: LOG_CHANNEL("Ch.%d is MP3 - meta: %d",index,icyMetaInterval); source = new MP3Stream(); break; case ChanInfo::T_NSV: LOG_CHANNEL("Ch.%d is NSV",index); source = new RawStream(); break; case ChanInfo::T_WMA: case ChanInfo::T_WMV: throw StreamException("Ch.%d is WMA/WMV - but not MMS",index); break; case ChanInfo::T_OGG: LOG_CHANNEL("Ch.%d is OGG",index); source = new OGGStream(); break; default: LOG_CHANNEL("Ch.%d is Raw",index); source = new RawStream(); break; } } return source;}// -----------------------------------void ChannelStream::openHTTP(Channel *ch, const char *url){ ClientSocket *sock = sys->createSocket(); if (!sock) throw StreamException("Cannot create socket"); String nextURL; nextURL.set(url); while(!nextURL.isEmpty()) { char *fileName = nextURL.cstr(); char *dir = strstr(fileName,"/"); if (dir) *dir++=0; LOG_CHANNEL("Fetch Host=%s",fileName); if (dir) LOG_CHANNEL("Fetch Dir=%s",dir); Host host; host.fromStrName(fileName,80); sock->open(host); sock->connect(); HTTP http(*sock); http.writeLine("GET /%s HTTP/1.1",dir?dir:""); http.writeLine("%s %s",HTTP_HS_HOST,fileName); http.writeLine("%s %s",HTTP_HS_CONNECTION,"close"); http.writeLine("%s %s",HTTP_HS_ACCEPT,"*/*"); http.writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT); http.writeLine("icy-metadata:1"); http.writeLine(""); int res = http.readResponse(); if ((res!=200) && (res!=302)) { LOG_ERROR("HTTP response: %s",http.cmdLine); throw StreamException("Bad HTTP connect"); } nextURL.clear(); while (http.nextHeader()) { LOG_CHANNEL("Fetch HTTP: %s",http.cmdLine); Servent::readICYHeader(http,ch->info,NULL); if (http.isHeader("icy-metaint")) ch->icyMetaInterval = http.getArgInt(); else if (http.isHeader("Location:")) nextURL.set(http.getArgStr()); } }}
// ------------------------------------------
void ChannelStream::updateStatus(Channel *ch)
{
ChanPacket pack;
if (getStatus(ch,pack))
{
if (!ch->isBroadcasting())
{
GnuID noID;
noID.clear();
int cnt = chanMgr->broadcastPacketUp(pack,ch->info.id,servMgr->sessionID,noID);
LOG_CHANNEL("Sent channel status update to %d clients",cnt);
}
}
}
// ------------------------------------------
bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
{
unsigned int ctime = sys->getTime();
ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);
if (!chl)
return false;
int newLocalListeners = ch->localListeners();
int newLocalRelays = ch->localRelays();
if (
(
(numListeners != newLocalListeners)
|| (numRelays != newLocalRelays)
|| (ch->isPlaying() != isPlaying)
|| (servMgr->getFirewall() != fwState)
|| (((ctime-lastUpdate)>chanMgr->hostUpdateInterval) && chanMgr->hostUpdateInterval)
)
&& ((ctime-lastUpdate) > 10)
)
{
numListeners = newLocalListeners;
numRelays = newLocalRelays;
isPlaying = ch->isPlaying();
fwState = servMgr->getFirewall();
lastUpdate = ctime;
ChanHit hit;
hit.initLocal(numListeners,numRelays,ch->info.numSkips,ch->info.getUptime(),isPlaying);
hit.tracker = ch->isBroadcasting();
MemoryStream pmem(pack.data,sizeof(pack.data));
AtomStream atom(pmem);
GnuID noID;
noID.clear();
atom.writeParent(PCP_BCST,7);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_TRACKERS);
atom.writeChar(PCP_BCST_HOPS,0);
atom.writeChar(PCP_BCST_TTL,7);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
atom.writeBytes(PCP_BCST_CHANID,ch->info.id.id,16);
hit.writeAtoms(atom,false,noID);
pack.len = pmem.pos;
pack.type = ChanPacket::T_PCP;
return true;
}else
return false;
}
// -----------------------------------
bool Channel::checkBump()
{
// if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > 20))
// {
// LOG_ERROR("Ch.%d Auto bumped",index);
// bump = true;
// }
if (bump)
{
bump = false;
return true;
}else
return false;
}
// -----------------------------------int Channel::readStream(Stream &in,ChannelStream *source){
int error = 0;
info.numSkips = 0;
source->readHeader(in,this);
peercastApp->channelStart(&info);
rawData.lastWriteTime = 0;
bool wasBroadcasting=false;
try
{
while (thread.active && !peercastInst->isQuitting) {
error = source->readPacket(in,this);
if (error)
break;
if (checkIdle())
{
LOG_DEBUG("Ch.%d idle",index);
break;
}
if (checkBump())
{
LOG_DEBUG("Ch.%d bumped",index);
break;
}
if (in.eof())
{
LOG_DEBUG("Ch.%d eof",index);
break;
}
if (rawData.writePos > 0)
{
if (isBroadcasting())
{
if ((sys->getTime()-lastTrackerUpdate) >= chanMgr->hostUpdateInterval)
{
GnuID noID;
noID.clear();
broadcastTrackerUpdate(noID);
}
wasBroadcasting = true;
}else
{
setStatus(Channel::S_RECEIVING);
}
source->updateStatus(this);
}
sys->sleepIdle(); }
}catch(StreamException &e)
{
LOG_ERROR("readStream: %s",e.msg);
error = -1;
}
setStatus(S_CLOSING);
if (wasBroadcasting)
{
GnuID noID;
noID.clear();
broadcastTrackerUpdate(noID,true);
}
peercastApp->channelStop(&info);
source->readEnd(in,this);
return error;
}// -----------------------------------void PeercastStream::readHeader(Stream &in,Channel *ch){ if (in.readTag() != 'PCST') throw StreamException("Not PeerCast stream");}// -----------------------------------void PeercastStream::readEnd(Stream &,Channel *){}// -----------------------------------int PeercastStream::readPacket(Stream &in,Channel *ch){ ChanPacket pack; { pack.readPeercast(in); MemoryStream mem(pack.data,pack.len); switch(pack.type) { case ChanPacket::T_HEAD:
// update sync pos
ch->headPack = pack;
pack.pos = ch->streamPos;
ch->newPacket(pack);
ch->streamPos+=pack.len;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -