📄 channel.cpp.svn-base
字号:
// IP char ipStr[64]; host.toStr(ipStr); return new XML::Node("host ip=\"%s\" hops=\"%d\" listeners=\"%d\" uptime=\"%d\" skips=\"%d\" push=\"%d\" busy=\"%d\" stable=\"%d\" agent=\"%s\" update=\"%d\" tracker=\"%d\" relays=\"%d\"", ipStr, hops, numListeners, upTime, numSkips, firewalled?1:0, busy?1:0, stable?1:0, agentStr, sys->getTime()-time,
tracker,
numRelays );}// -----------------------------------XML::Node *ChanHitList::createHitsXML(){ XML::Node *hn = new XML::Node("hits listeners=\"%d\" hosts=\"%d\" busy=\"%d\" stable=\"%d\" firewalled=\"%d\" closest=\"%d\" furthest=\"%d\" newest=\"%d\" ", numListeners(), numHits(), numBusy(), numStable(), numFirewalled(), closestHit(), furthestHit(), sys->getTime()-newestHit() ); for(int i=0; i<MAX_HITS; i++) if (hits[i].host.ip) hn->add(hits[i].createXML()); return hn;}// -----------------------------------XML::Node *Channel::createRelayXML(bool showStat){ const char *ststr; ststr = getStatusStr(); if (!showStat) if ((status == S_RECEIVING) || (status == S_BROADCASTING)) ststr = "OK"; ChanHitList *chl = chanMgr->findHitList(info); return new XML::Node("relay listeners=\"%d\" relays=\"%d\" hosts=\"%d\" status=\"%s\"", numListeners(), numRelays(), (chl!=NULL)?chl->numHits():0, ststr ); }// -----------------------------------void ChanMeta::fromXML(XML &xml){ MemoryStream tout(data,MAX_DATALEN); xml.write(tout); len = tout.pos;}// -----------------------------------void ChanMeta::fromMem(void *p, int l){ len = l; memcpy(data,p,len);}// -----------------------------------void ChanMeta::addMem(void *p, int l){ if ((len+l) <= MAX_DATALEN) { memcpy(data+len,p,l); len += l; }}
// -----------------------------------
void Channel::broadcastTrackerUpdate(GnuID &svID, bool force)
{
unsigned int ctime = sys->getTime();
if (((ctime-lastTrackerUpdate) > 30) || (force))
{
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack));
AtomStream atom(mem);
ChanHit hit;
ChanHitList *chl = chanMgr->findHitListByID(info.id);
if (!chl)
throw StreamException("Broadcast channel has no hitlist");
int totListeners = chl->getTotalListeners() + numListeners();
int totRelays = chl->getTotalRelays() + numRelays();
hit.initLocal(totListeners,totRelays,info.numSkips,info.getUptime(),isPlaying(),servMgr->controlInFull());
hit.tracker = true;
atom.writeParent(PCP_BCST,5);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ROOT);
atom.writeChar(PCP_BCST_HOPS,0);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeParent(PCP_CHAN,4);
atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
atom.writeBytes(PCP_CHAN_KEY,chanMgr->broadcastID.id,16);
info.writeInfoAtoms(atom);
info.writeTrackAtoms(atom);
hit.writeAtoms(atom,false,info.id);
pack.len = mem.pos;
pack.type = ChanPacket::T_PCP;
GnuID noID;
noID.clear();
int cnt = servMgr->broadcastPacket(pack,noID,servMgr->sessionID,svID,Servent::T_COUT);
if (cnt)
{
LOG_DEBUG("Sent tracker update to %d client(s)",cnt);
lastTrackerUpdate = ctime;
}
}
}
// -----------------------------------
bool Channel::sendPacketUp(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did)
{
if ( isActive()
&& (!cid.isSet() || info.id.isSame(cid))
&& (!did.isSet() || remoteID.isSame(did))
&& (!sid.isSet() || !remoteID.isSame(sid))
&& sourceStream
)
return sourceStream->sendPacket(pack);
return false;
}
// -----------------------------------void Channel::updateMeta(){
if (isBroadcasting())
{
unsigned int ctime = sys->getTime();
if ((ctime-lastMetaUpdate) > 30)
{
lastMetaUpdate = ctime;
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack));
AtomStream atom(mem);
atom.writeParent(PCP_BCST,3);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_RELAYS);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeParent(PCP_CHAN,3);
atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
info.writeInfoAtoms(atom);
info.writeTrackAtoms(atom);
pack.len = mem.pos;
pack.type = ChanPacket::T_PCP;
GnuID noID;
noID.clear();
servMgr->broadcastPacket(pack,info.id,servMgr->sessionID,noID,Servent::T_STREAM);
broadcastTrackerUpdate(noID);
}
}
ChanHitList *chl = chanMgr->findHitList(info);
if (chl)
chl->info = info;
peercastApp->channelUpdate(&info);}// -----------------------------------ChannelStream *Channel::createSource(){// if (servMgr->relayBroadcast)// chanMgr->broadcastRelays(NULL,chanMgr->minBroadcastTTL,chanMgr->maxBroadcastTTL); ChannelStream *source=NULL; if (info.srcProtocol == ChanInfo::SP_PEERCAST) { LOG_CHANNEL("Ch.%d is Peercast",index); source = new PeercastStream(); } else if (info.srcProtocol == ChanInfo::SP_PCP)
{
LOG_CHANNEL("Ch.%d is PCP",index);
PCPStream *pcp = new PCPStream();
source = pcp;
}
else if (info.srcProtocol == ChanInfo::SP_MMS) { LOG_CHANNEL("Ch.%d is MMS",index); source = new MMSStream(); }else { switch(info.contentType) { case ChanInfo::T_MP3: LOG_CHANNEL("Ch.%d is MP3 - meta: %d",index,icyMetaInterval); source = new MP3Stream(); break; case ChanInfo::T_NSV: LOG_CHANNEL("Ch.%d is NSV",index); source = new RawStream(); break; case ChanInfo::T_WMA: case ChanInfo::T_WMV: throw StreamException("Ch.%d is WMA/WMV - but not MMS",index); break; case ChanInfo::T_OGG: LOG_CHANNEL("Ch.%d is OGG",index); source = new OGGStream(); break; default: LOG_CHANNEL("Ch.%d is Raw",index); source = new RawStream(); break; } } return source;}// -----------------------------------void ChannelStream::openHTTP(Channel *ch, const char *url){ ClientSocket *sock = sys->createSocket(); if (!sock) throw StreamException("Cannot create socket"); String nextURL; nextURL.set(url); while(!nextURL.isEmpty()) { char *fileName = nextURL.cstr(); char *dir = strstr(fileName,"/"); if (dir) *dir++=0; LOG_CHANNEL("Fetch Host=%s",fileName); if (dir) LOG_CHANNEL("Fetch Dir=%s",dir); Host host; host.fromStrName(fileName,80); sock->open(host); sock->connect(); HTTP http(*sock); http.writeLine("GET /%s HTTP/1.1",dir?dir:""); http.writeLine("%s %s",HTTP_HS_HOST,fileName); http.writeLine("%s %s",HTTP_HS_CONNECTION,"close"); http.writeLine("%s %s",HTTP_HS_ACCEPT,"*/*"); http.writeLine("%s %s",HTTP_HS_AGENT,PCX_AGENT); http.writeLine("icy-metadata:1"); http.writeLine(""); int res = http.readResponse(); if ((res!=200) && (res!=302)) { LOG_ERROR("HTTP response: %s",http.cmdLine); throw StreamException("Bad HTTP connect"); } nextURL.clear(); while (http.nextHeader()) { LOG_CHANNEL("Fetch HTTP: %s",http.cmdLine); Servent::readICYHeader(http,ch->info,NULL,false); if (http.isHeader("icy-metaint")) ch->icyMetaInterval = http.getArgInt(); else if (http.isHeader("Location:")) nextURL.set(http.getArgStr()); } }}
// ------------------------------------------
void ChannelStream::updateStatus(Channel *ch)
{
ChanPacket pack;
if (getStatus(ch,pack))
{
if (!ch->isBroadcasting())
{
GnuID noID;
noID.clear();
int cnt = chanMgr->broadcastPacketUp(pack,ch->info.id,servMgr->sessionID,noID);
LOG_CHANNEL("Sent channel status update to %d clients",cnt);
}
}
}
// ------------------------------------------
bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
{
unsigned int ctime = sys->getTime();
ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);
if (!chl)
return false;
int newNumListeners = ch->numListeners();
int newNumRelays = ch->numRelays();
if (
(
(numListeners != newNumListeners)
|| (numRelays != newNumRelays)
|| (ch->isPlaying() != isPlaying)
|| (servMgr->getFirewall() != fwState)
|| (((ctime-lastUpdate)>chanMgr->hostUpdateInterval) && chanMgr->hostUpdateInterval)
)
&& ((ctime-lastUpdate) > 10)
)
{
numListeners = newNumListeners;
numRelays = newNumRelays;
isPlaying = ch->isPlaying();
fwState = servMgr->getFirewall();
lastUpdate = ctime;
ChanHit hit;
hit.initLocal(numListeners,numRelays,ch->info.numSkips,ch->info.getUptime(),isPlaying,servMgr->streamFull());
hit.tracker = ch->isBroadcasting();
MemoryStream pmem(pack.data,sizeof(pack.data));
AtomStream atom(pmem);
GnuID noID;
noID.clear();
atom.writeParent(PCP_BCST,5);
atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_TRACKERS);
atom.writeChar(PCP_BCST_HOPS,0);
atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
atom.writeBytes(PCP_BCST_CHANID,ch->info.id.id,16);
hit.writeAtoms(atom,false,noID);
pack.len = pmem.pos;
pack.type = ChanPacket::T_PCP;
return true;
}else
return false;
}
// -----------------------------------
bool Channel::checkBump()
{
if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > 20))
bump = true;
if (bump)
{
bump = false;
return true;
}else
return false;
}
// -----------------------------------void Channel::readStream(Stream &in,ChannelStream *source){
info.numSkips = 0;
source->readHeader(in,this);
peercastApp->channelStart(&info);
rawData.lastWriteTime = 0;
bool wasBroadcasting=false;
try
{
while ((!in.eof() && (thread.active) && !peercastInst->isQuitting)) {
source->readPacket(in,this);
if (checkBump()) throw StreamException("Bumped"); if (checkIdle()) break;
if (rawData.writePos > 0)
{
if (isBroadcasting())
{
if ((sys->getTime()-lastTrackerUpdate) >= chanMgr->hostUpdateInterval)
{
GnuID noID;
noID.clear();
broadcastTrackerUpdate(noID);
}
wasBroadcasting = true;
}else
{
setStatus(Channel::S_RECEIVING);
}
source->updateStatus(this);
}
sys->sleepIdle(); }
}catch(StreamException &e)
{
LOG_ERROR("readStream error: %s",e.msg);
}
setStatus(S_CLOSING);
if (wasBroadcasting)
{
GnuID noID;
noID.clear();
broadcastTrackerUpdate(noID,true);
}
peercastApp->channelStop(&info);
source->readEnd(in,this);
}// -----------------------------------void PeercastStream::readHeader(Stream &in,Channel *ch){ if (in.readTag() != 'PCST') throw StreamException("Not PeerCast stream");}// -----------------------------------void PeercastStream::readEnd(Stream &,Channel *){}// -----------------------------------void PeercastStream::readPacket(Stream &in,Channel *ch){ ChanPacket pack; { pack.readPeercast(in); MemoryStream mem(pack.data,pack.len); switch(pack.type) { case ChanPacket::T_HEAD:
// update sync pos
ch->headPack = pack;
pack.pos = ch->streamPos;
ch->newPacket(pack);
ch->streamPos+=pack.len; break; case ChanPacket::T_DATA:
pack.pos = ch->streamPos;
ch->newPacket(pack);
ch->streamPos+=pack.len;
break;
case ChanPacket::T_META: ch->insertMeta.fromMem(pack.data,pack.len); { if (pack.len) { XML xml; xml.read(mem); XML::Node *n = xml.findNode("channel"); if (n) { ch->info.updateFromXML(n); ChanHitList *chl = chanMgr->findHitList(ch->info); if (chl) chl->info.updateFromXML(n); } } }
ch->updateMeta(); break;#if 0
case ChanPacket::T_SYNC: { unsigned int s = mem.readLong(); if ((s-ch->syncPos) != 1) { LOG_CHANNEL("Ch.%d SKIP: %d to %d (%d)",ch->index,ch->syncPos,s,ch->info.numSkips); if (ch->syncPos) { ch->info.numSkips++; if (ch->info.numSkips>50) throw StreamException("Bumped - Too many skips"); } } ch->syncPos = s; } break;
#endif
}
}}// -----------------------------------void ChannelStream::readRaw(Stream &in, Channel *ch){ ChanPacket pack; const int readLen = 8192;
pack.init(ChanPacket::T_DATA,pack.data,readLen,ch->streamPos); in.read(pack.data,pack.len); ch->newPacket(pack);
ch->checkReadDelay(pack.len);
ch->streamPos+=pack.len;}
// ------------------------------------------
void RawStream::readHeader(Stream &,Channel *)
{
}
// ------------------------------------------
void RawStream::readPacket(Stream &in,Channel *ch)
{
readRaw(in,ch);
}
// ------------------------------------------
void RawStream::readEnd(Stream &,Channel *)
{
}
// -----------------------------------void ChanPacket::init(TYPE t, const void *p, unsigned int l,unsigned int _pos){
type = t; if (l > MAX_DATALEN)
throw StreamException("Packet data too large"); len = l; memcpy(data,p,len);
pos = _pos;
}// -----------------------------------
void ChanPacket::writeRaw(Stream &out)
{
out.write(data,len);
}
// -----------------------------------void ChanPacket::writePeercast(Stream &out){ unsigned int tp = 0;
switch (type)
{
case T_HEAD: tp = 'HEAD'; break;
case T_META: tp = 'META'; break;
case T_DATA: tp = 'DATA'; break;
}
if (type != T_UNKNOWN)
{
out.writeTag(tp); out.writeShort(len); out.writeShort(0); out.write(data,len);
}}// -----------------------------------void ChanPacket::readPeercast(Stream &in){
unsigned int tp = in.readTag();
switch (tp)
{
case 'HEAD': type = T_HEAD; break;
case 'DATA': type = T_DATA; break;
case 'META': type = T_META; break;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -