📄 channel.cpp
字号:
ChannelStream *cs = ch->sourceStream;
ch->sourceStream = NULL;
cs->kill();
delete cs;
}
if (ch->sock) { ch->sock->close(); delete ch->sock; ch->sock = NULL; }
if (error == 404)
{
LOG_ERROR("Channel not found");
return;
}
}
ch->lastIdleTime = sys->getTime();
ch->setStatus(Channel::S_IDLE);
while ((ch->checkIdle()) && (ch->thread.active))
{
sys->sleepIdle();
}
sys->sleepIdle(); }}// -----------------------------------void Channel::startICY(ClientSocket *cs, SRC_TYPE st){ srcType = st; type = T_BROADCAST; cs->setReadTimeout(0); // stay connected even when theres no data coming through sock = cs; info.srcProtocol = ChanInfo::SP_HTTP;
streamIndex = ++chanMgr->icyIndex; sourceData = new ICYSource(); startStream();}// -----------------------------------static char *nextMetaPart(char *str,char delim){ while (*str) { if (*str == delim) { *str++ = 0; return str; } str++; } return NULL;}// -----------------------------------static void copyStr(char *to,char *from,int max){ char c; while ((c=*from++) && (--max)) if (c != '\'') *to++ = c; *to = 0;}// -----------------------------------void Channel::processMp3Metadata(char *str){ ChanInfo newInfo = info;
char *cmd=str; while (cmd) { char *arg = nextMetaPart(cmd,'='); if (!arg) break; char *next = nextMetaPart(arg,';'); if (strcmp(cmd,"StreamTitle")==0)
{ newInfo.track.title.setUnquote(arg,String::T_ASCII);
newInfo.track.title.convertTo(String::T_UNICODE);
}else if (strcmp(cmd,"StreamUrl")==0)
{ newInfo.track.contact.setUnquote(arg,String::T_ASCII);
newInfo.track.contact.convertTo(String::T_UNICODE);
} cmd = next; } updateInfo(newInfo);}// -----------------------------------XML::Node *ChanHit::createXML(){ // IP char ipStr[64]; host.toStr(ipStr); return new XML::Node("host ip=\"%s\" hops=\"%d\" listeners=\"%d\" relays=\"%d\" uptime=\"%d\" push=\"%d\" relay=\"%d\" direct=\"%d\" cin=\"%d\" stable=\"%d\" version=\"%d\" update=\"%d\" tracker=\"%d\"", ipStr, numHops, numListeners, numRelays,
upTime, firewalled?1:0, relay?1:0, direct?1:0,
cin?1:0,
stable?1:0, version, sys->getTime()-time,
tracker
);}// -----------------------------------XML::Node *ChanHitList::createXML(bool addHits){ XML::Node *hn = new XML::Node("hits hosts=\"%d\" listeners=\"%d\" relays=\"%d\" firewalled=\"%d\" closest=\"%d\" furthest=\"%d\" newest=\"%d\"", numHits(),
numListeners(), numRelays(), numFirewalled(), closestHit(), furthestHit(), sys->getTime()-newestHit() ); if (addHits)
{
ChanHit *h = hit;
while (h)
{ if (h->host.ip) hn->add(h->createXML());
h = h->next;
}
} return hn;}// -----------------------------------XML::Node *Channel::createRelayXML(bool showStat){ const char *ststr; ststr = getStatusStr(); if (!showStat) if ((status == S_RECEIVING) || (status == S_BROADCASTING)) ststr = "OK"; ChanHitList *chl = chanMgr->findHitList(info); return new XML::Node("relay listeners=\"%d\" relays=\"%d\" hosts=\"%d\" status=\"%s\"", localListeners(), localRelays(), (chl!=NULL)?chl->numHits():0, ststr ); }// -----------------------------------void ChanMeta::fromXML(XML &xml){ MemoryStream tout(data,MAX_DATALEN); xml.write(tout); len = tout.pos;}// -----------------------------------void ChanMeta::fromMem(void *p, int l){ len = l; memcpy(data,p,len);}// -----------------------------------void ChanMeta::addMem(void *p, int l){ if ((len+l) <= MAX_DATALEN) { memcpy(data+len,p,l); len += l; }}
// -----------------------------------
void Channel::broadcastTrackerUpdate(GnuID &svID, bool force)
{
unsigned int ctime = sys->getTime();
if (((ctime-lastTrackerUpdate) > 30) || (force))
{
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack));
AtomStream atom(mem);
ChanHit hit;
ChanHitList *chl = chanMgr->findHitListByID(info.id);
if (!chl)
throw StreamException("Broadcast channel has no hitlist");
int numListeners = totalListeners();
int numRelays = totalRelays();
unsigned int oldp = rawData.getOldestPos();
unsigned int newp = rawData.getLatestPos();
hit.initLocal(numListeners,numRelays,info.numSkips,info.getUptime(),isPlaying(),oldp,newp);
hit.tracker = true;
atom.writeParent(PCP_BCST,7);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ROOT);
atom.writeChar(PCP_BCST_HOPS,0);
atom.writeChar(PCP_BCST_TTL,7);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
atom.writeParent(PCP_CHAN,4);
atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
atom.writeBytes(PCP_CHAN_BCID,chanMgr->broadcastID.id,16);
info.writeInfoAtoms(atom);
info.writeTrackAtoms(atom);
hit.writeAtoms(atom,info.id);
pack.len = mem.pos;
pack.type = ChanPacket::T_PCP;
GnuID noID;
noID.clear();
int cnt = servMgr->broadcastPacket(pack,noID,servMgr->sessionID,svID,Servent::T_COUT);
if (cnt)
{
LOG_DEBUG("Sent tracker update for %s to %d client(s)",info.name.cstr(),cnt);
lastTrackerUpdate = ctime;
}
}
}
// -----------------------------------
bool Channel::sendPacketUp(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did)
{
if ( isActive()
&& (!cid.isSet() || info.id.isSame(cid))
&& (!sid.isSet() || !remoteID.isSame(sid))
&& sourceStream
)
return sourceStream->sendPacket(pack,did);
return false;
}
// -----------------------------------void Channel::updateInfo(ChanInfo &newInfo){
if (info.update(newInfo))
{
if (isBroadcasting())
{
unsigned int ctime = sys->getTime();
if ((ctime-lastMetaUpdate) > 30)
{
lastMetaUpdate = ctime;
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack));
AtomStream atom(mem);
atom.writeParent(PCP_BCST,7);
atom.writeChar(PCP_BCST_HOPS,0);
atom.writeChar(PCP_BCST_TTL,7);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_RELAYS);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
atom.writeBytes(PCP_BCST_CHANID,info.id.id,16);
atom.writeParent(PCP_CHAN,3);
atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
info.writeInfoAtoms(atom);
info.writeTrackAtoms(atom);
pack.len = mem.pos;
pack.type = ChanPacket::T_PCP;
GnuID noID;
noID.clear();
servMgr->broadcastPacket(pack,info.id,servMgr->sessionID,noID,Servent::T_RELAY);
broadcastTrackerUpdate(noID);
}
}
ChanHitList *chl = chanMgr->findHitList(info);
if (chl)
chl->info = info;
peercastApp->channelUpdate(&info);
}
}// -----------------------------------ChannelStream *Channel::createSource(){// if (servMgr->relayBroadcast)// chanMgr->broadcastRelays(NULL,chanMgr->minBroadcastTTL,chanMgr->maxBroadcastTTL); ChannelStream *source=NULL; if (info.srcProtocol == ChanInfo::SP_PEERCAST) { LOG_CHANNEL("Channel is Peercast"); source = new PeercastStream(); } else if (info.srcProtocol == ChanInfo::SP_PCP)
{
LOG_CHANNEL("Channel is PCP");
PCPStream *pcp = new PCPStream(remoteID);
source = pcp;
}
else if (info.srcProtocol == ChanInfo::SP_MMS) { LOG_CHANNEL("Channel is MMS"); source = new MMSStream(); }else { switch(info.contentType) { case ChanInfo::T_MP3: LOG_CHANNEL("Channel is MP3 - meta: %d",icyMetaInterval); source = new MP3Stream(); break; case ChanInfo::T_NSV: LOG_CHANNEL("Channel is NSV"); source = new NSVStream(); break; case ChanInfo::T_WMA: case ChanInfo::T_WMV: throw StreamException("Channel is WMA/WMV - but not MMS"); break; case ChanInfo::T_OGG: case ChanInfo::T_OGM:
LOG_CHANNEL("Channel is OGG"); source = new OGGStream(); break; default: LOG_CHANNEL("Channel is Raw"); source = new RawStream(); break; } } return source;}// ------------------------------------------
void ChannelStream::updateStatus(Channel *ch)
{
ChanPacket pack;
if (getStatus(ch,pack))
{
if (!ch->isBroadcasting())
{
GnuID noID;
noID.clear();
int cnt = chanMgr->broadcastPacketUp(pack,ch->info.id,servMgr->sessionID,noID);
LOG_CHANNEL("Sent channel status update to %d clients",cnt);
}
}
}
// ------------------------------------------
bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
{
unsigned int ctime = sys->getTime();
ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);
if (!chl)
return false;
int newLocalListeners = ch->localListeners();
int newLocalRelays = ch->localRelays();
if (
(
(numListeners != newLocalListeners)
|| (numRelays != newLocalRelays)
|| (ch->isPlaying() != isPlaying)
|| (servMgr->getFirewall() != fwState)
|| (((ctime-lastUpdate)>chanMgr->hostUpdateInterval) && chanMgr->hostUpdateInterval)
)
&& ((ctime-lastUpdate) > 10)
)
{
numListeners = newLocalListeners;
numRelays = newLocalRelays;
isPlaying = ch->isPlaying();
fwState = servMgr->getFirewall();
lastUpdate = ctime;
ChanHit hit;
unsigned int oldp = ch->rawData.getOldestPos();
unsigned int newp = ch->rawData.getLatestPos();
hit.initLocal(numListeners,numRelays,ch->info.numSkips,ch->info.getUptime(),isPlaying,oldp,newp);
hit.tracker = ch->isBroadcasting();
MemoryStream pmem(pack.data,sizeof(pack.data));
AtomStream atom(pmem);
GnuID noID;
noID.clear();
atom.writeParent(PCP_BCST,7);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_TRACKERS);
atom.writeChar(PCP_BCST_HOPS,0);
atom.writeChar(PCP_BCST_TTL,11);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
atom.writeBytes(PCP_BCST_CHANID,ch->info.id.id,16);
hit.writeAtoms(atom,noID);
pack.len = pmem.pos;
pack.type = ChanPacket::T_PCP;
return true;
}else
return false;
}
// -----------------------------------
bool Channel::checkBump()
{
if (!isBroadcasting() && (!sourceHost.tracker))
if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > 30))
{
LOG_ERROR("Channel Auto bumped");
bump = true;
}
if (bump)
{
bump = false;
return true;
}else
return false;
}
// -----------------------------------int Channel::readStream(Stream &in,ChannelStream *source){
int error = 0;
info.numSkips = 0;
source->readHeader(in,this);
peercastApp->channelStart(&info);
rawData.lastWriteTime = 0;
bool wasBroadcasting=false;
try
{
while (thread.active && !peercastInst->isQuitting) {
if (checkIdle())
{
LOG_DEBUG("Channel idle");
break;
}
if (checkBump())
{
LOG_DEBUG("Channel bumped");
error = -1;
break;
}
if (in.eof())
{
LOG_DEBUG("Channel eof");
break;
}
if (in.readReady())
{
error = source->readPacket(in,this);
if (error)
break;
if (rawData.writePos > 0)
{
if (isBroadcasting())
{
if ((sys->getTime()-lastTrackerUpdate) >= chanMgr->hostUpdateInterval)
{
GnuID noID;
noID.clear();
broadcastTrackerUpdate(noID);
}
wasBroadcasting = true;
}else
{
setStatus(Channel::S_RECEIVING);
}
source->updateStatus(this);
}
}
sys->sleepIdle(); }
}catch(StreamException &e)
{
LOG_ERROR("readStream: %s",e.msg);
error = -1;
}
setStatus(S_CLOSING);
if (wasBroadcasting)
{
GnuID noID;
noID.clear();
broadcastTrackerUpdate(noID,true);
}
peercastApp->channelStop(&info);
source->readEnd(in,this);
return error;
}// -----------------------------------void PeercastStream::readHeader(Stream &in,Channel *ch){ if (in.readTag() != 'PCST') throw StreamException("Not PeerCast stream");}// -----------------------------------void PeercastStream::readEnd(Stream &,Channel *){}// -----------------------------------int PeercastStream::readPacket(Stream &in,Channel *ch){ ChanPacket pack; { pack.readPeercast(in); MemoryStream mem(pack.data,pack.len); switch(pack.type) { case ChanPacket::T_HEAD:
// update sync pos
ch->headPack = pack;
pack.pos = ch->streamPos;
ch->newPacket(pack);
ch->streamPos+=pack.len; break; case ChanPacket::T_DATA:
pack.pos = ch->streamPos;
ch->newPacket(pack);
ch->streamPos+=pack.len;
break;
case ChanPacket::T_META: ch->insertMeta.fromMem(pack.data,pack.len); { if (pack.len) { XML xml; xml.read(mem); XML::Node *n = xml.findNode("channel"); if (n) {
ChanInfo newInfo = ch->info; newInfo.updateFromXML(n); ChanHitList *chl = chanMgr->findHitList(ch->info); if (chl) newInfo.updateFromXML(n); ch->updateInfo(newInfo);
} } }
break;#if 0
case ChanPacket::T_SYNC: { unsigned int s = mem.readLong(); if ((s-ch->syncPos) != 1) { LOG_CHANNEL("Ch.%d SKIP: %d to %d (%d)",ch->index,ch->syncPos,s,ch->info.numSkips); if (ch->syncPos) { ch->info.numSkips++; if (ch->info.numSkips>50) throw StreamException("Bumped - Too many skips"); } } ch->syncPos = s; } break;
#endif
}
}
return 0;}// -----------------------------------void ChannelStream::readRaw(Stream &in, Channel *ch){ ChanPacket pack; const int readLen = 8192;
pack.init(ChanPacket::T_DATA,pack.data,readLen,ch->streamPos); in.read(pack.data,pack.len); ch->newPacket(pack);
ch->checkReadDelay(pack.len);
ch->streamPos+=pack.len;}
// ------------------------------------------
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -